Futures
在指南早期暗示的future
是用于管理异步逻辑的构建块。 它们是Tokio使用的底层异步抽象。
future
的实施由future
crate
提供。 但是,为方便起见,Tokio重新导出了许多类型。
Futures是什么
future是表示异步计算完成的值。通常,由于系统中某处发生的事件使future
完成。虽然我们从基本I / O的角度看待事物,但您可以使用future
来表示各种事件,例如:
在线程池中执行的数据库查询。当数据库查询完成时,
future
完成,其值是查询的结果。对服务器的RPC 调用。当服务器回复时,
future
完成,其值是服务器的响应。超时事件。当时间到了,
future
就完成了,它的值是()。在线程池上运行的长时间运行的CPU密集型任务。任务完成后,
future
完成,其值为任务的返回值。从套接字读取字节。当字节准备就绪时,
future
就完成了 - 根据缓冲策略,字节可能直接返回,或作为副作用写入某个现有缓冲区。
future
抽象的整个要点是允许异步函数,即不能立即返回值的函数,将会返回一些东西。
例如,异步HTTP客户端可以提供如下所示的get函数:
pub fn get(&self, uri: &str) -> ResponseFuture { ... }
然后,库的用户将使用该函数:
let response_future = client.get("https://www.example.com");
现在,response_future不是实际响应。这是个一旦收到响应,就会完成的future
。 但是,调用者要具有一个具体的东西(future
)使他们可以开始使用它。 例如,它们可以链式计算在接收到响应时执行,或者它们可能将future
传递给函数。
let response_is_ok = response_future
.map(|response| {
response.status().is_ok()
});
track_response_success(response_is_ok);
所有与future
一起采取的行动都不会立即执行任何工作。他们不能,因为他们没有实际的HTTP响应。相反,他们定义了响应future
完成时要完成的工作。
future
crate
和Tokio都有一系列组合功能,可以用来处理future
。
future
实现使用Tokio时,实现Future是很常见的,因此适应它是很重要的。
如前一节所述,Rustfuture
是基于轮询的。这是Rustfuture
库的一个独特方面。其他编程语言的大多数future
库使用基于推送的模型,其中回调被提供给future
,并且计算立即调用计算结果回调。
使用基于轮询的模型提供了许多优点,包括作为零成本抽象,即,与手动编写异步代码相比,使用Rustfuture
没有额外的开销。
future
的特点如下:
trait Future {
/// The type of the value returned when the future completes.
type Item;
/// The type representing errors that occured while processing the
/// computation.
type Error;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
}
您可能会注意到这与用于实现异步任务的trait
完全相同。 这是因为一旦计算完成,异步任务就是“正好”的future
,其值为()。
通常,当您实现Future时,您将定义一个由子(或内部)future
组成的计算。 在这种情况下,future
的实现会尝试调用内部future
,如果内部future
未准备好,则返回NotReady。
以下示例是由另一个返回usize
并将使该值加倍的future
组成的future
:
pub struct Doubler<T> {
inner: T,
}
pub fn double<T>(inner: T) -> Doubler<T> {
Doubler { inner }
}
impl<T> Future for Doubler<T>
where T: Future<Item = usize>
{
type Item = usize;
type Error = T::Error;
fn poll(&mut self) -> Result<Async<usize>, T::Error> {
match self.inner.poll()? {
Async::Ready(v) => Ok(Async::Ready(v * 2)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
当Doublerfuture
被轮询时,它会调查其内在的future
。 如果内部future
尚未准备好,Doubler future将返回NotReady。 如果内心的future
已经准备就绪,那么Doubler的future
会使返回值加倍并返回Ready。
因为上面的匹配模式很常见,所以future
crate
提供了一个宏:try_ready!。 它类似于try!
或?
,但它也返回NotReady。 上面的poll函数可以使用try_ready重写! 如下:
fn poll(&mut self) -> Result<Async<usize>, T::Error> {
let v = try_ready!(self.inner.poll());
Ok(Async::Ready(v * 2))
}
返回NotReady
当一个任务返回NotReady时,一旦它转换到就绪状态,执行者就会被通知。这使执行者能够有效地调度任务。
当函数返回Async :: NotReady时,在状态转换为“就绪”时通知执行程序至关重要。否则,任务将无限挂起,永远不会再次运行。
对于大多数future
的实现,这是可传递的。当future
实施是子future
的组合时,当至少一个内部future
返回NotReady时,外部future
仅返回NotReady。因此,一旦内部future
转变为就绪状态,外部future
将转变为就绪状态。在这种情况下,NotReady合约已经满足,因为内部future
将在准备就绪时通知执行者。
最内层的future
,有时也被称为“资源”,是负责通知执行人的人。这是通过对task :: current()
返回的任务调用notify
来完成的。
在执行者调用任务轮询之前,它将任务上下文设置为线程局部变量。然后,最内部的future
从线程本地访问上下文,以便一旦其准备状态改变就能够通知任务。
我们将在后面的部分中更深入地探索实施资源和任务系统。除非你从内部的future
获得NotReady,否则这里的关键是不要返回NotReady
future
一个更复杂的让我们看一下稍微复杂的future
实现。 在这种情况下,我们将实现一个取得主机名,进行DNS解析,然后建立与远程主机的连接的future
。 我们假设存在一个如下所示的resolve函数:
pub fn resolve(host: &str) -> ResolveFuture;
其中ResolveFuture
是一个返回SocketAddr
的future
。
实现future
的步骤是:
- 调用
resolve
以获取ResolveFuture
实例。 - 调用
ResolveFuture :: poll
直到它返回一个SocketAddr
。 - 将
SocketAddr
传递给TcpStream :: connect
。 - 调用
ConnectFuture :: poll
直到它返回TcpStream
。 - 使用
TcpStream
完成外部future
。
我们将使用枚举来跟踪future
的状态.
enum State {
// Currently resolving the host name
Resolving(ResolveFuture),
// Establishing a TCP connection to the remote host
Connecting(ConnectFuture),
}
ResolveAndConnect
的future
定义为:
pub struct ResolveAndConnect {
state: State,
}
pub fn resolve_and_connect(host: &str) -> ResolveAndConnect {
let state = State::Resolving(resolve(host));
ResolveAndConnect { state }
}
impl Future for ResolveAndConnect {
type Item = TcpStream;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<TcpStream>, io::Error> {
use self::State::*;
loop {
let addr = match self.state {
Resolving(ref mut fut) => {
try_ready!(fut.poll())
}
Connecting(ref mut fut) => {
return fut.poll();
}
};
let connecting = TcpStream::connect(&addr);
self.state = Connecting(connecting);
}
}
}
这解释了Future如何实现状态机。 这个future
可以是两种状态中的任何一种:
Resolving
Connecting
每次调用poll时,我们都会尝试将状态机推进到下一个状态。
现在,我们刚刚实现的future
基本上是AndThen,所以我们可能只是使用该组合器而不是重新实现它。
resolve(my_host)
.and_then(|addr| TcpStream::connect(&addr))