示例:Echo服务器
我们将使用到目前为止所覆盖的内容来构建echo服务器。这是一个Tokio应用程序,它包含了我们迄今为止学到的所有内容。服务器将简单地从连接的客户端接收消息,并将收到的相同消息发送回客户端。
我们将能够使用我们在hello world
部分中创建的基本Tcp客户端来测试此echo服务器 。
完整的代码可以在这里找到。
创建
首先,生成一个新的箱子。
$ cargo new --bin echo-server
cd echo-server
接下来,添加必要的依赖项:
[dependencies]
tokio = "0.1"
main.rs
extern crate tokio;
extern crate futures;
use tokio::io;
use tokio::net::TcpListener;
use tokio::prelude::*;
现在,我们为服务器设置必要的结构:
- 绑定
TcpListener
到本地端口。 - 定义接受入站连接并处理它们的任务。
- 生成服务器任务。
- 启动Tokio运行时
同样,在执行者上生成服务器任务之前,实际上不会执行任何工作。
fn main() {
let addr = "127.0.0.1:6142".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
// Here we convert the `TcpListener` to a stream of incoming connections
// with the `incoming` method. We then define how to process each element in
// the stream with the `for_each` combinator method
let server = listener.incoming().for_each(|socket| {
// TODO: Process socket
Ok(())
})
.map_err(|err| {
// Handle error by printing to STDOUT.
println!("accept error = {:?}", err);
});
println!("server running on localhost:6142");
# // `select` completes when the first of the two futures completes. Since
# // future::ok() completes immediately, the server won't hang waiting for
# // more connections. This is just so the doc test doesn't hang.
# let server = server.select(futures::future::ok(())).then(|_| Ok(()));
// Start the server
//
// This does a few things:
//
// * Start the Tokio runtime
// * Spawns the `server` task onto the runtime.
// * Blocks the current thread until the runtime becomes idle, i.e. all
// spawned tasks have completed.
tokio::run(server);
}
在这里,我们创建了一个可以侦听传入TCP连接的TcpListener
。在监听器上, 我们调用incoming,将监听器转换为入站客户端连接流。然后我们调用for_each,它将产生每个入站客户端连接。 目前我们没有对此入站连接做任何事情 - 这是我们的下一步。
一旦我们拥有了我们的服务器,我们就可以将它交给tokio::run
。到目前为止,我们的服务器功能一无所获。由Tokio运行时驱动我们的Future
完成。
注意:我们必须在服务器上调用map_err
,因为tokio :: run
需要一个Item
为type()和Error
为type()的Future
。 这是为了确保在将Future
交付给运行时之前处理所有值和错误。
处理连接
既然我们有传入的客户端连接,我们应该处理它们。
我们只想将从套接字读取的所有数据复制回套接字本身(例如“echo”)。 我们可以使用标准的io :: copy
函数来做到这一点。
该copy函数有两个参数,从哪里读取以及在哪里写入。 但是,我们只有一个参数,使用socket
。 幸运的是,有一个方法split,它将可读和可写的流分成两半。 此操作允许我们独立地处理每个流,例如将它们作为copy
函数的两个参数传递。
然后,copy
函数返回一个Future
,当复制操作完成时,将接收此Future
,解析为复制的数据量。
让我们来看看我们再次传递给for_each
的闭包。
let server = listener.incoming().for_each(|socket| {
// split the socket stream into readable and writable parts
let (reader, writer) = socket.split();
// copy bytes from the reader into the writer
let amount = io::copy(reader, writer);
let msg = amount.then(|result| {
match result {
Ok((amount, _, _)) => println!("wrote {} bytes", amount),
Err(e) => println!("error: {}", e),
}
Ok(())
});
// spawn the task that handles the client connection socket on to the
// tokio runtime. This means each client connection will be handled
// concurrently
tokio::spawn(msg);
Ok(())
})
如您所见,我们已将socket
流拆分为可读写部分。 然后我们使用io :: copy
从reader
读取并写入writer
。 我们使用then
组合器来查看amount
未来的Item
和Error
作为Result
打印一些诊断。
对tokio::spawn的调用是关键所在。 至关重要的是我们希望所有clients
同时取得进展,而不是在完成另一个client
时阻止其中一个。 为此,我们使用tokio :: spawn
函数在后台执行工作。
如果我们没有这样做,那么for_each
中块的每次调用都会在一次解决,这意味着我们永远不会同时处理两个客户端连接!