I’m excited about Tokio. It’s written by some well-known Rust developers and most likely it will become the de facto asynchronous library of Rust.
In Tokio, creating services are easy. First you specify the protocol that the service speaks; that is, provide concrete implementation to Encoder
and Decoder
, and specify a ServerProto
(the actual protocol). Then tell the service how each request should be handled; that is implementing Service
in Tokio. More details here.
This model works well for the client-server applications. For P2P applications, we need something different, because the server (Service
) cannot initiate connections easily. The solution (at least my solution) is to forget about tokio-service
(the crate with the Service
trait) and go one level lower and use tokio-core
, which is what tokio-service
is built upon. The official documentation explains the difference between those crates pretty well.
I’m going assume familiarity with Rust and Tokio to describe my solution. The trick is to run a “server” and multiple “clients” for each node. I’m using quote marks here because it is only a server/client from the perspective of a TCP connection, i.e. the node that received the first SYN is the server and the node that sends the SYN is the client. After the connection is created, the “server” and the “clients” all handle incoming messages the same way.
My “server” implementation is here and looks like this.
pub fn serve(node: Rc<RefCell<Node>>, handle: Handle)
-> Box<Future<Item=(), Error=io::Error>> {
let socket = TcpListener::bind(&node.borrow().addr, &handle).unwrap();
let srv = socket.incoming().for_each(move |(tcpstream, addr)| {
let (sink, stream) = tcpstream.framed(MsgCodec).split();
let (tx, rx) = mpsc::unbounded();
// process the incoming stream
let node2 = node.clone();
let read = stream.for_each(move |msg| {
node2.borrow_mut().process(msg, tx.clone())
});
handle.spawn(read.then(|_| Ok(())));
// send everything in rx to sink
let write = sink.send_all(rx.map_err(|()| {
io::Error::new(io::ErrorKind::Other, "rx shouldn't have an error")
}));
handle.spawn(write.then(|_| Ok(())));
Ok(())
});
Box::new(srv)
}
The corresponding “client” implementation is here.
pub fn start_client(node: Rc<RefCell<Node>>, handle: Handle, addr: &SocketAddr)
-> Box<Future<Item=(), Error=io::Error>> {
println!("starting client {}", addr);
let client = TcpStream::connect(&addr, &handle).and_then(move |socket| {
let (sink, stream) = socket.framed(MsgCodec).split();
let (tx, rx) = mpsc::unbounded();
let node2 = node.clone();
let tx2 = tx.clone();
let read = stream.for_each(move |msg| {
node.borrow_mut().process(msg, tx.clone())
});
handle.spawn(read.then(|_| Ok(())));
// client sends ping on start
mpsc::UnboundedSender::send(&tx2, Msg::Ping((node2.borrow().id, node2.borrow().addr.clone())))
.expect("tx failed");
// send everything in rx to sink
let write = sink.send_all(rx.map_err(|()| {
io::Error::new(io::ErrorKind::Other, "rx shouldn't have an error")
}));
handle.spawn(write.then(|_| Ok(())));
Ok(())
});
return Box::new(client);
}
Note that similarity between the two, the part where the message is actually handled are the same—node.borrow_mut().process(msg, tx.clone())
.
I used channels in order to send messages to peers, namely mpsc::unbounded()
. Each connected peer has its own channel, including the node itself. The producer part (tx
) are stored in the main struct Node
, the consumer part are dumped into the TCP sinks, e.g. here. I can simply iterate over all the tx
and send something into it to broadcast a message to all the peers.
The key thing I’ve achieved so far is the ability to send/receive message to any or all connected peers. This is not possible with Service
from the best of my limited knowledge. With those primitives, it’s possible to build any toy P2P system by just modifying the message handler Node::process
. In the second part I will cover peer discovery using gossiping and bootstrapping commonly found in P2P networks.