Futures
在指南早期暗示的future是用于管理异步逻辑的构建块。 它们是Tokio使用的底层异步抽象。
future的实施由future crate提供。 但是,为方便起见,Tokio重新导出了许多类型。
Futures是什么
future是表示异步计算完成的值。通常,由于系统中某处发生的事件使future完成。虽然我们从基本I / O的角度看待事物,但您可以使用future来表示各种事件,例如:
在线程池中执行的数据库查询。当数据库查询完成时,
future完成,其值是查询的结果。对服务器的RPC 调用。当服务器回复时,
future完成,其值是服务器的响应。超时事件。当时间到了,
future就完成了,它的值是()。在线程池上运行的长时间运行的CPU密集型任务。任务完成后,
future完成,其值为任务的返回值。从套接字读取字节。当字节准备就绪时,
future就完成了 - 根据缓冲策略,字节可能直接返回,或作为副作用写入某个现有缓冲区。
future抽象的整个要点是允许异步函数,即不能立即返回值的函数,将会返回一些东西。
例如,异步HTTP客户端可以提供如下所示的get函数:
pub fn get(&self, uri: &str) -> ResponseFuture { ... }
然后,库的用户将使用该函数:
let response_future = client.get("https://www.example.com");
现在,response_future不是实际响应。这是个一旦收到响应,就会完成的future。 但是,调用者要具有一个具体的东西(future)使他们可以开始使用它。 例如,它们可以链式计算在接收到响应时执行,或者它们可能将future传递给函数。
let response_is_ok = response_future
.map(|response| {
response.status().is_ok()
});
track_response_success(response_is_ok);
所有与future一起采取的行动都不会立即执行任何工作。他们不能,因为他们没有实际的HTTP响应。相反,他们定义了响应future完成时要完成的工作。
future crate和Tokio都有一系列组合功能,可以用来处理future。
实现future
使用Tokio时,实现Future是很常见的,因此适应它是很重要的。
如前一节所述,Rustfuture是基于轮询的。这是Rustfuture库的一个独特方面。其他编程语言的大多数future库使用基于推送的模型,其中回调被提供给future,并且计算立即调用计算结果回调。
使用基于轮询的模型提供了许多优点,包括作为零成本抽象,即,与手动编写异步代码相比,使用Rustfuture没有额外的开销。
future的特点如下:
trait Future {
/// The type of the value returned when the future completes.
type Item;
/// The type representing errors that occured while processing the
/// computation.
type Error;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
}
您可能会注意到这与用于实现异步任务的trait完全相同。 这是因为一旦计算完成,异步任务就是“正好”的future,其值为()。
通常,当您实现Future时,您将定义一个由子(或内部)future组成的计算。 在这种情况下,future的实现会尝试调用内部future,如果内部future未准备好,则返回NotReady。
以下示例是由另一个返回usize并将使该值加倍的future组成的future:
pub struct Doubler<T> {
inner: T,
}
pub fn double<T>(inner: T) -> Doubler<T> {
Doubler { inner }
}
impl<T> Future for Doubler<T>
where T: Future<Item = usize>
{
type Item = usize;
type Error = T::Error;
fn poll(&mut self) -> Result<Async<usize>, T::Error> {
match self.inner.poll()? {
Async::Ready(v) => Ok(Async::Ready(v * 2)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
当Doublerfuture被轮询时,它会调查其内在的future。 如果内部future尚未准备好,Doubler future将返回NotReady。 如果内心的future已经准备就绪,那么Doubler的future会使返回值加倍并返回Ready。
因为上面的匹配模式很常见,所以future crate提供了一个宏:try_ready!。 它类似于try! 或?,但它也返回NotReady。 上面的poll函数可以使用try_ready重写! 如下:
fn poll(&mut self) -> Result<Async<usize>, T::Error> {
let v = try_ready!(self.inner.poll());
Ok(Async::Ready(v * 2))
}
返回NotReady
当一个任务返回NotReady时,一旦它转换到就绪状态,执行者就会被通知。这使执行者能够有效地调度任务。
当函数返回Async :: NotReady时,在状态转换为“就绪”时通知执行程序至关重要。否则,任务将无限挂起,永远不会再次运行。
对于大多数future的实现,这是可传递的。当future实施是子future的组合时,当至少一个内部future返回NotReady时,外部future仅返回NotReady。因此,一旦内部future转变为就绪状态,外部future将转变为就绪状态。在这种情况下,NotReady合约已经满足,因为内部future将在准备就绪时通知执行者。
最内层的future,有时也被称为“资源”,是负责通知执行人的人。这是通过对task :: current()返回的任务调用notify来完成的。
在执行者调用任务轮询之前,它将任务上下文设置为线程局部变量。然后,最内部的future从线程本地访问上下文,以便一旦其准备状态改变就能够通知任务。
我们将在后面的部分中更深入地探索实施资源和任务系统。除非你从内部的future获得NotReady,否则这里的关键是不要返回NotReady
一个更复杂的future
让我们看一下稍微复杂的future实现。 在这种情况下,我们将实现一个取得主机名,进行DNS解析,然后建立与远程主机的连接的future。 我们假设存在一个如下所示的resolve函数:
pub fn resolve(host: &str) -> ResolveFuture;
其中ResolveFuture是一个返回SocketAddr的future。
实现future的步骤是:
- 调用
resolve以获取ResolveFuture实例。 - 调用
ResolveFuture :: poll直到它返回一个SocketAddr。 - 将
SocketAddr传递给TcpStream :: connect。 - 调用
ConnectFuture :: poll直到它返回TcpStream。 - 使用
TcpStream完成外部future。
我们将使用枚举来跟踪future的状态.
enum State {
// Currently resolving the host name
Resolving(ResolveFuture),
// Establishing a TCP connection to the remote host
Connecting(ConnectFuture),
}
ResolveAndConnect的future定义为:
pub struct ResolveAndConnect {
state: State,
}
pub fn resolve_and_connect(host: &str) -> ResolveAndConnect {
let state = State::Resolving(resolve(host));
ResolveAndConnect { state }
}
impl Future for ResolveAndConnect {
type Item = TcpStream;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<TcpStream>, io::Error> {
use self::State::*;
loop {
let addr = match self.state {
Resolving(ref mut fut) => {
try_ready!(fut.poll())
}
Connecting(ref mut fut) => {
return fut.poll();
}
};
let connecting = TcpStream::connect(&addr);
self.state = Connecting(connecting);
}
}
}
这解释了Future如何实现状态机。 这个future可以是两种状态中的任何一种:
ResolvingConnecting
每次调用poll时,我们都会尝试将状态机推进到下一个状态。
现在,我们刚刚实现的future基本上是AndThen,所以我们可能只是使用该组合器而不是重新实现它。
resolve(my_host)
.and_then(|addr| TcpStream::connect(&addr))