读写数据

非阻塞I/O

概述我们简单提到的是tokio的I/O类型实现了无阻塞异步版std::io::Readstd::io::Write,名为 AsyncReadAsyncWrite。这些是Tokio的I/O中不可或缺的一部分,在使用I/O代码时,理解这些东东非常重要。

注意:在本节中,我们将主要讨论AsyncReadAsyncWrite几乎完全相同,只是将数据写入I/O资源(如TCP套接字)而不是从中读取。

好,让我们来看看,AsyncRead能干什么:

use std::io::Read;
pub trait AsyncRead: Read {
    // ...
    // various provided methods
    // ...
}

嗯,这都说了些啥?嗯,AsyncRead只是继承了std::io中的Read,以及一份额外的契约。AsyncRead文档中提到:

此trait继承自std::io::Read并表明I/O对象是非阻塞的。当无可用数据时,所有非阻塞I/O对象都必须返回一个error而不是阻塞当前线程。

最后一部分至关重要。如果你为一个类型实现AsyncRead,你得保证调用read而不会阻塞。相反,如果它不是非阻塞的,则应该返回io::ErrorKind::WouldBlock错误以表明操作将被阻塞(例如因为没有可用的数据)。poll_read方法依赖于此:

fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
    match self.read(buf) {
        Ok(t) => Ok(Async::Ready(t)),
        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
            Ok(Async::NotReady)
        }
        Err(e) => Err(e),
    }
}

这段代码应该很熟悉。如果仔细看一眼,poll_read 看起来很像Future::poll。那是因为它基本上就是!一个实现AsyncRead的类型本质上就是一个可以尝试从中读取数据的Future,它将通知你是否Ready(某些数据已被读取)或NotReady(你需要再次poll_read)。

使用I/O Future

由于AsyncRead(和AsyncWrite)几乎都是Futures,你可以很容易地将它们嵌入到你自己的Futures中,poll_read它们,就像poll任何其他嵌入Future一样。您甚至可以根据需要使用try_ready!, 这个micro可以传递error和NotReady状态。我们将在下一节中更多地讨论如何直接使用这些traits。但是,在许多情况下,为了简化,Tokio在tokio::io中提供了许多有用的组合器combinator,用于在AsyncReadAsyncWrite之上执行常见的I/O操作。通常,它们封装了AsyncReadAsyncWrite类型,实现了Future,并在给定的读或写操作完成时完成。

第一个好用的I/O组合器是read_exact。它需要一个可变的buffer(&mut [u8])和AsyncRead的实现作为参数,并返回一个Future读取足够的字节来填充buffer。在内部,返回的future只是跟踪它已经读取了多少字节,并在AsyncRead时继续发出poll_ready (如果需要,返回NotReady),直到它正好填满了buffer。那时,它返回带着填满的buffer的Ready(buf)。让我们来看看:

use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

let addr = "127.0.0.1:12345".parse().unwrap();
let read_8_fut = TcpStream::connect(&addr)
    .and_then(|stream| {
        // We need to create a buffer for read_exact to write into.
        // A Vec<u8> is a good starting point.
        // read_exact will read buffer.len() bytes, so we need
        // to make sure the Vec isn't empty!
        let mut buf = vec![0; 8];

        // read_exact returns a Future that resolves when
        // buffer.len() bytes have been read from stream.
        tokio::io::read_exact(stream, buf)
    })
    .inspect(|(_stream, buf)| {
        // Notice that we get both the buffer and the stream back
        // here, so that we can now continue using the stream to
        // send a reply for example.
        println!("got eight bytes: {:x?}", buf);
    });

// We can now either chain more futures onto read_8_fut,
// or if all we wanted to do was read and print those 8
// bytes, we can just use tokio::run to run it (taking
// care to map Future::Item and Future::Error to ()).

第二个常用的I/O组合器是write_all。它需要一个buffer(&[u8])和一个AsyncWrite的实现作为参数,并返回一个将缓冲区的所有字节用poll_write写入AsyncWrite的future。当Future被resolve,整个缓冲区已经写完并被刷新。我们可以结合 read_exact使用,来echo服务器的任何内容:

use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

let echo_fut = TcpStream::connect(&addr)
    .and_then(|stream| {
        // We're going to read the first 32 bytes the server sends us
        // and then just echo them back:
        let mut buf = vec![0; 32];
        // First, we need to read the server's message
        tokio::io::read_exact(stream, buf)
    })
    .and_then(|(stream, buf)| {
        // Then, we use write_all to write the entire buffer back:
        tokio::io::write_all(stream, buf)
    })
    .inspect(|(_stream, buf)| {
        println!("echoed back {} bytes: {:x?}", buf.len(), buf);
    });

// As before, we can chain more futures onto echo_fut,
// or declare ourselves finished and run it with tokio::run.

Tokio还带有一个I/O组合器来实现上面例子中这种复制。它(或许不足为奇)被称为copycopy接受一个AsyncRead和一个AsyncWrite,连续地将从中AsyncRead读出的所有字节,写入到AsyncWrite,直到 poll_read指示输入已经关闭并且所有字节都已写出并刷新到输出。这是我们在echo服务器中使用的组合器!它大大简化了我们上面的示例,并使其适用于任何量级的服务器数据!

use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

let echo_fut = TcpStream::connect(&addr)
    .and_then(|stream| {
        // First, we need to get a separate read and write handle for
        // the connection so that we can forward one to the other.
        // See "Split I/O resources" below for more details.
        let (reader, writer) = stream.split();
        // Then, we can use copy to send all the read bytes to the
        // writer, and return how many bytes it read/wrote.
        tokio::io::copy(reader, writer)
    })
    .inspect(|(bytes_copied, r, w)| {
        println!("echoed back {} bytes", bytes_copied);
    });

简约!

到目前为止我们谈到的组合器都是用于相当底层的操作:读取字节,写入字节,复制字节。但是,通常情况下,您希望在更高级别的表述上操作,例如“lines”。这些Tokio也帮你搞定了!lines接受一个 AsyncRead,并返回一个Stream,从输入中产生yield每一行,直到没有更多行要读:

use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

let lines_fut = TcpStream::connect(&addr).and_then(|stream| {
    // We want to parse out each line we receive on stream.
    // To do that, we may need to buffer input for a little while
    // (if the server sends two lines in one packet for example).
    // Because of that, lines requires that the AsyncRead it is
    // given *also* implements BufRead. This may be familiar if
    // you've ever used the lines() method from std::io::BufRead.
    // Luckily, BufReader from the standard library gives us that!
    let stream = std::io::BufReader::new(stream);
    tokio::io::lines(stream).for_each(|line| {
        println!("server sent us the line: {}", line);
        // This closure is called for each line we receive,
        // and returns a Future that represents the work we
        // want to do before accepting the next line.
        // In this case, we just wanted to print, so we
        // don't need to do anything more.
        Ok(())
    })
});

tokio::io中,还有更多的I/O组合器,在你决定自己写一个之前,不妨先看一下是否已经有了实现!

拆分I/O资源

上面的copy和echo服务器例子中都包含下面这个神秘的代码片段:

let (reader, writer) = socket.split();
let bytes_copied = tokio::io::copy(reader, writer);

正如上面的注释所解释的那样,我们将TcpStreamsocket)拆分为读半部分和写半部分,并使用我们上面讨论的copy组合器产生一个Future异步复制从读半部分到写半部分的所有数据。但究竟为什么需要这种“split”呢?毕竟,AsyncRead::poll_readAsyncWrite::poll_write只接受参数&mut self

要回答这个问题,我们需要回顾一下Rust的Ownership机制。回想一下,Rust只允许你在任一时刻对一给定变量拥有单个可变引用。但是我们必须传递两个 参数给copy,一个用于从哪里读,另一个指定写到哪里。但是,一旦我们将一个可变引用作为其中一个参数传递给TcpStream,我们就不能构造第二个指向它的可变引用作为第二个参数传递给它!我们知道,copy 将不能同时读取和写入到其参数,但是这并没有在copy的类型定义中表示出来。

进入split方法,一个AsyncRead中提供的方法也实现了AsyncWrite。如果我们看一下方法签名,我们就会看到:

fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
  where Self: AsyncWrite { ... }

返回的ReadHalf实现了AsyncReadWriteHalf 实现了AsyncWrite。至关重要的是,我们现在有两个独立的 指针在我们的类型中,我们可以单独传递它们。这很方便copy,但这也意味着我们可以将每一传递到不同的Future,并完全独立地处理读写操作!在幕后,split确保如果我们同时尝试读写,一次只发生其中一个。

传输

在I/O应用中,将一个AsyncRead转换为Stream(像lines这样)或 将一个AsyncWrite转化为Sink相当普遍。他们经常想要把从网络上读、写字节的方式进行抽象,并让大多数应用程序代码处理更方便的“request”和“response”类型。这通常被称为“framing”:您可以将它们视为接收和发送的应用程序数据的“帧”,而不仅仅视您的连接视为字节进、字节出。帧化的字节流通常被称为“传输”。

传输通常使用编解码器codec实现。例如, lines表述了一个非常简单的编解码器,用换行符\n分割字节字符串,并在将其传递给应用程序之前,将每个帧作为字符串解析。Tokio在tokio::codec中提供了helpers来帮助实现新的编解码器; 你为你的传输实现了EncoderDecoder traits,并用Framed::new从你的字节流中创建一个Sink + Stream(比如一个TcpStream)。这几乎就像魔术一样!有些编解码器只用读或写端(比如lines)。让我们来看一下编写基于行的编解码器的简单实现(即使LinesCodec 已经存在):

extern crate bytes;
use bytes::{BufMut, BytesMut};
use tokio::codec::{Decoder, Encoder};
use tokio::prelude::*;

// This is where we'd keep track of any extra book-keeping information
// our transport needs to operate.
struct LinesCodec;

// Turns string errors into std::io::Error
fn bad_utf8<E>(_: E) -> std::io::Error {
    std::io::Error::new(std::io::ErrorKind::InvalidData, "Unable to decode input as UTF8")
}

// First, we implement encoding, because it's so straightforward.
// Just write out the bytes of the string followed by a newline!
// Easy-peasy.
impl Encoder for LinesCodec {
    type Item = String;
    type Error = std::io::Error;

    fn encode(&mut self, line: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
        // Note that we're given a BytesMut here to write into.
        // BytesMut comes from the bytes crate, and aims to give
        // efficient read/write access to a buffer. To use it,
        // we have to reserve memory before we try to write to it.
        buf.reserve(line.len() + 1);
        // And now, we write out our stuff!
        buf.put(line);
        buf.put_u8(b'\n');
        Ok(())
    }
}

// The decoding is a little trickier, because we need to look for
// newline characters. We also need to handle *two* cases: the "normal"
// case where we're just asked to find the next string in a bunch of
// bytes, and the "end" case where the input has ended, and we need
// to find any remaining strings (the last of which may not end with a
// newline!
impl Decoder for LinesCodec {
    type Item = String;
    type Error = std::io::Error;

    // Find the next line in buf!
    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        Ok(if let Some(offset) = buf.iter().position(|b| *b == b'\n') {
            // We found a newline character in this buffer!
            // Cut out the line from the buffer so we don't return it again.
            let mut line = buf.split_to(offset + 1);
            // And then parse it as UTF-8
            Some(
                std::str::from_utf8(&line[..line.len() - 1])
                    .map_err(bad_utf8)?
                    .to_string(),
            )
        } else {
            // There are no newlines in this buffer, so no lines to speak of.
            // Tokio will make sure to call this again when we have more bytes.
            None
        })
    }

    // Find the next line in buf when there will be no more data coming.
    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        Ok(match self.decode(buf)? {
            Some(frame) => {
                // There's a regular line here, so we may as well just return that.
                Some(frame)
            },
            None => {
                // There are no more lines in buf!
                // We know there are no more bytes coming though,
                // so we just return the remainder, if any.
                if buf.is_empty() {
                    None
                } else {
                    Some(
                        std::str::from_utf8(&buf.take()[..])
                            .map_err(bad_utf8)?
                            .to_string(),
                    )
                }
            }
        })
    }
}
上次更新: 3/5/2019, 1:18:59 AM