运行时模型

现在我们将介绍Tokio /future运行时模型。 Tokio构建在future箱顶部并使用其运行时模型。 这允许它也使用future箱与其他图书馆互操作。

注意:此运行时模型与其他语言中的异步库非常不同。 虽然在较高的层面上,API看起来很相似,但代码执行方式却有所不同。

同步模型

首先,让我们简要谈谈同步(或阻塞)模型。 这是Rust标准库使用的模型。

// let socket = ...;
let mut buf = [0; 1024];
let n = socket.read(&mut buf).unwrap();

// Do something with &buf[..n];

调用socket.read时,无论套接字在其缓冲区中是否有待处理数据, 如果有待处理的数据,则读取的调用将立即返回,并且buf将填充该数据。 如果没有未决数据,则read函数将阻止当前线程,直到收到数据。 此时,buf将填充此新接收的数据,并且将返回read函数

为了同时在许多不同的套接字上并发执行读取,每个套接字需要一个线程。 每个套接字使用一个线程不能很好地扩展到大量的套接字。 这被称为c10k问题。

非阻塞套接字

在执行像read这样的操作时避免阻塞线程的方法是不阻塞线程! 当套接字在其接收缓冲区中没有未决数据时,read函数立即返回,表明套接字“未准备好”以执行读取操作。

使用Tokio TcpStream时,如果没有要读取的待处理数据,则对read的调用将返回类型ErrorKind :: WouldBlock的错误。 此时,调用者负责稍后再次调用read。 诀窍是知道“晚些时候”的时间。

考虑非阻塞读取的另一种方法是“轮询”套接字以读取数据。

轮询模型

轮询套接字数据的策略可以推广到任何操作。 例如,在轮询模型中获取“小部件”的函数看起来像这样:

fn poll_widget() -> Async<Widget> { ... }

此函数返回Async <Widget>,其中Async是Ready(Widget)或NotReady的枚举。 Async枚举由future箱提供,是轮询模型的构建块之一。

现在,让我们定义一个没有使用此poll_widget函数的组合器的异步任务。 该任务将执行以下操作:

  1. 获取小部件。
  2. 将小部件打印到STDOUT。
  3. 终止任务。

为了定义任务,我们实现了Future trait。

///轮询单个小部件并将其写入STDOUT的任务。
pub struct MyTask;

impl Future for MyTask {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<()>, ()> {
        match poll_widget() {
            Async::Ready(widget) => {
                println!("widget={:?}", widget);
                Ok(Async::Ready(()))
            }
            Async::NotReady => {
                return Ok(Async::NotReady);
            }
        }
    }
}

重要提示: 返回Async :: NotReady具有特殊含义。 有关详细信息,请参阅下一节。

需要注意的关键是,当调用MyTask :: poll时,它会立即尝试获取小部件。 如果对poll_widget的调用返回NotReady,则该任务无法继续进行。 然后任务返回NotReady,表明它尚未准备好完成处理。

任务实现不会阻止。 相反,“将来的某个时间”,执行者将再次调用MyTask :: poll。 将再次调用poll_widget。 如果poll_widget已准备好返回窗口小部件,则该任务又可以打印窗口小部件。 然后,可以通过返回Ready来完成任务。

执行者(Executors)

为了使任务取得进展,必须调用MyTask :: poll。 这就是执行者的工作。

执行程序负责反复调用任务轮询,直到返回Ready。 有很多不同的方法可以做到这一点。 例如,CurrentThread执行者将阻止当前线程并遍历所有生成的任务,并对它们调用poll。 ThreadPool在线程池中调度任务。 这也是运行时使用的默认执行者。

必须在执行者上生成所有任务,否则不会执行任何工作。

在最简单的情况下,执行者可能看起来像这样:

pub struct SpinExecutor {
    tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
}

impl SpinExecutor {
    pub fn spawn<T>(&mut self, task: T)
    where T: Future<Item = (), Error = ()> + 'static
    {
        self.tasks.push_back(Box::new(task));
    }

    pub fn run(&mut self) {
        while let Some(mut task) = self.tasks.pop_front() {
            match task.poll().unwrap() {
                Async::Ready(_) => {}
                Async::NotReady => {
                    self.tasks.push_back(task);
                }
            }
        }
    }
}

当然,这不会非常有效。 执行程序在一个繁忙的循环中运转并尝试轮询所有任务,即使任务将再次返回NotReady。

理想情况下,执行者可以通过某种方式知道任务的“准备就绪”状态何时被改变,即当轮询调用返回Ready时。 执行者看起来像这样:

    pub fn run(&mut self) {
        loop {
            while let Some(mut task) = self.ready_tasks.pop_front() {
                match task.poll().unwrap() {
                    Async::Ready(_) => {}
                    Async::NotReady => {
                        self.not_ready_tasks.push_back(task);
                    }
                }
            }

            if self.not_ready_tasks.is_empty() {
                return;
            }

            // Put the thread to sleep until there is work to do
            self.sleep_until_tasks_are_ready();
        }
    }

当任务从“未准备好”变为“准备好”时能够得到通知是future任务模型的核心。 我们将很快进一步深入研究。

上次更新: 11/4/2018, 4:39:08 AM