Building a Peer-to-Peer Chat Application in Rust
Building a P2P Chat Application with Rust and Iroh
This tutorial demonstrates how to build a peer-to-peer chat application from scratch using Rust and the Iroh library. While this implementation is simplified, it illustrates core concepts of P2P networking and the Iroh gossip protocol.
The code in the above video differs slightly from the code presented below. We recommend watching the video and following along until you get comfortable. When you are ready for a deeper dive into the code, walk through this tutorial.
Prerequisites
The tutorial assumes basic programming knowledge but no prior Rust experience. To begin, install Rust by following the instructions at rust-lang.org.
Project Setup
First, initialize a new Rust project:
cargo init iroh-gossip-chat
cd iroh-gossip-chat
cargo run
This should print "hello world", and you can see the code for that in the src/main.rs
file, which is the file we will be building on for this tutorial.
Next, let's add the iroh dependencies, and a few others we will need to get the chat application working.
tokio
handles our async runtime, allowing us to do things like listen for incoming messages at the same time as we write outgoing messages.
anyhow
allows us to more easily handle errors, and is especially useful when writing binaries or when prototyping.
rand
generates randomness.
Install the required dependencies:
cargo add iroh tokio anyhow rand
Basic Endpoint Configuration
The first step is creating a basic endpoint configuration. Replace the following with the current contents of the src/main.rs
file:
use anyhow::Result;
use iroh::{Endpoint, SecretKey};
#[tokio::main]
async fn main() -> Result<()> {
// Generate a secret key. This is the source of
// identity for your node. If you want to have
// the same identity each time you open the app,
// you would need to store and load it each time.
let secret_key = SecretKey::generate(rand::rngs::OsRng);
// Create an endpoint.
let endpoint = Endpoint::builder()
// Pass in your secret key. If you don't pass
// in a secret key a new one will be generated
// for you each time.
.secret_key(secret_key)
// Enable n0 discovery. This allows you to
// dial by `NodeId`, and allows you to be
// dialed by `NodeId`.
.discovery_n0()
// Bind the endpoint to the socket.
.bind()
.await?;
println!("> our node id: {}", endpoint.node_id());
Ok(())
}
Adding Gossip Protocol Support
The iroh-gossip
protocol is what we will use to not only send messages, but also coordinate who we are connected to in our chat application.
Install the gossip protocol:
cargo add iroh-gossip
Then update the code to implement basic gossip functionality:
use anyhow::Result;
use iroh::protocol::Router;
use iroh::Endpoint;
use iroh_gossip::net::Gossip;
#[tokio::main]
async fn main() -> Result<()> {
// We've removed the `SecretKey::generate` method.
// The `Endpoint` will generate a `SecretKey` for
// you under the hood if you don't supply one.
let endpoint = Endpoint::builder()
.discovery_n0()
.bind()
.await?;
println!("> our node id: {}", endpoint.node_id());
// Build and instance of the gossip protocol
// and add a clone of the endpoint we have built.
// The gossip protocol will use the endpoint to
// make connections.
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
// The Router is how we manage protocols on top
// of the iroh endpoint. It handles all incoming
// messages and routes them to the correct
// protocol.
let router = Router::builder(endpoint.clone())
.accept(iroh_gossip::ALPN, gossip.clone())
.spawn()
.await?;
// Cleanly shutdown the router.
router.shutdown().await?;
Ok(())
}
Creating and Broadcasting to a Topic
Topics are the fundamental unit of communication in the gossip protocol. Here's how to create a topic and broadcast a message:
use anyhow::Result;
use iroh::protocol::Router;
use iroh::Endpoint;
use iroh_gossip::{net::Gossip, proto::TopicId};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
println!("> our node id: {}", endpoint.node_id());
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
let router = Router::builder(endpoint.clone())
.accept(iroh_gossip::ALPN, gossip.clone())
.spawn()
.await?;
// Create a new topic.
let id = TopicId::from_bytes(rand::random());
let node_ids = vec![];
// Subscribe to the topic.
// Since the `node_ids` list is empty, we will
// subscribe to the topic, but not attempt to
// connect to any other nodes.
let topic = gossip.subscribe(id, node_ids)?;
// `split` splits the topic into the `GossipSender`
// and `GossipReceiver` portions
let (sender, _receiver) = topic.split();
// Broadcast a messsage to the topic.
// Since no one else is apart of this topic,
// this message is currently going out to no one.
sender.broadcast("sup".into()).await?;
router.shutdown().await?;
Ok(())
}
Messages
The broadcast
method will send bytes over the wire. To keep ourselves organized, we should create a Message
enum that enumerates the kinds of messages we want folks to send. We can serialize those messages into bytes for the broadcast
method to send.
Let's write a Message::AboutMe
enum variant that allows someone who joins the chat to be called by a specific name.
And let's write a Message::Message
that has a String
with the actual chat messages.
Also, we want each of those messages to include the NodeId
of the sender. In an actual application, we would encode and decode the messages with keypairs to ensure that everyone who sends a message is actually who they say they are. For more on that, check out our more robust chat example that exists in the iroh-gossip
repo.
We need to add crates that will allow us to serialize our new message types as bytes and deserialize bytes as our message type.
serde
stands for Serialize/Deserialize
. serde-json
lets us easily encode and decode to the json format, but we can choose other formats. E.g., in the iroh-gossip
example, we use postcard
.
bytes
is a utility library for working with bytes.
cargo add serde --features serde/derive serde-json data_encoding
Then add the following to your main file:
// add these use statements to the top
use iroh::NodeId;
use serde::{Deserialize, Serialize};
...
// add the message code to the bottom
#[derive(Debug, Serialize, Deserialize)]
enum Message {
AboutMe { from: NodeId, name: String },
Message { from: NodeId, text: String },
}
impl Message {
fn from_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes).map_err(Into::into)
}
pub fn to_vec(&self) -> Vec<u8> {
serde_json::to_vec(self).expect("serde_json::to_vec is infallible")
}
}
Now, rather than broadcasting "sup", let's broadcast a message that let's others in the chat know our name:
// replace:
sender.broadcast("sup".into()).await?;
// with:
// Create an "about me" message
let message = Message::AboutMe {
from: endpoint.node_id(),
name: String::from("alice"),
};
// Turn the message into a `Vec`, and then use
// `into` to coerse the `Vec` into `Bytes`
sender.broadcast(message.to_vec().into()).await?;
Implementing Message Reception
To handle incoming messages, we need to iterate over the stream of messages that come in on the receiver
of the Topic
.
Dealing with streams in rust is complicated without additional help from crates that were designed to simplify using them. Knowing the best tools to use for this can also be complicated, since a certain crate may have a good trait implementation for some use cases, and a less good implemenation for others. It can also feel a bit yucky to add a big beefy crate to your code, when you are only using a very small subset of it's contents.
In this application, we are using the StreamExt
trait to make handling async streams easier. We've found that the best option right now is to use the futures-lite
crate. It contains a subset of the futures
crate.
Install the futures-lite
crate to handle async streams:
cargo add futures-lite
Then implement message reception. We are going to use a separate subscribe_loop
function to keep the code more simple:
// at the top of the file add these imports:
use std::collections::HashMap;
use iroh_gossip::net::{Event, GossipEvent, GossipReceiver};
use futures_lite::StreamExt;
...
// at the bottom of the file add this function:
/// Handle incoming events
async fn subscribe_loop(mut receiver: GossipReceiver) -> Result<()> {
// keep track of the mapping between `NodeId`s and names
let mut names = HashMap::new();
// iterate over all events
while let Some(event) = receiver.try_next().await? {
// if the Event is a `GossipEvent::Received`, let's deserialize the message:
if let Event::Gossip(GossipEvent::Received(msg)) = event {
// deserialize the message and match on the
// message type:
match Message::from_bytes(&msg.content)? {
Message::AboutMe { from, name } => {
// if it's an `AboutMe` message
// add and entry into the map
// and print the name
names.insert(from, name.clone());
println!("> {} is now known as {}", from.fmt_short(), name);
}
Message::Message { from, text } => {
// if it's a `Message` message,
// get the name from the map
// and print the message
let name = names
.get(&from)
.map_or_else(|| from.fmt_short(), String::to_string);
println!("{}: {}", name, text);
}
}
}
}
Ok(())
}
Let's call this subscribe_loop
function in our main function. Eventually, when we add the ability to send messages, we want this subscribe loop to be listening for incoming messages at the same time as we send outgoing messages.
To do that, we are going to call the subscribe_loop
function inside a tokio::spawn
. That will spawn a task so that the subscribe loop is listening concurrently with our writing loop.
Here are the imports and the main function so far:
use std::collections::HashMap;
use anyhow::Result;
use futures_lite::StreamExt;
use iroh::protocol::Router;
use iroh::{Endpoint, NodeId};
use iroh_gossip::{
net::{Event, Gossip, GossipEvent, GossipReceiver},
proto::TopicId,
};
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() -> Result<()> {
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
println!("> our node id: {}", endpoint.node_id());
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
let router = Router::builder(endpoint.clone())
.accept(iroh_gossip::ALPN, gossip.clone())
.spawn()
.await?;
let id = TopicId::from_bytes(rand::random());
let node_ids = vec![];
let (sender, receiver) = gossip.subscribe(id, node_ids)?.split();
let message = Message::AboutMe {
from: endpoint.node_id(),
name: String::from("alice"),
};
sender.broadcast(message.to_vec().into()).await?;
// subscribe and print loop
tokio::spawn(subscribe_loop(receiver));
router.shutdown().await?;
Ok(())
}
The input loop
Now that we can receive messages, let's code up how to send them.
We are going to write an input_loop
that reads from stdin. It's going to take a Sender
that can send String
s on a channel. Each time we get input from stdin
, we will read it to a String buffer, and then send that string over the channel:
/// Read input from stdin
fn input_loop(line_tx: tokio::sync::mpsc::Sender<String>) -> Result<()> {
// create a new string buffer
let mut buffer = String::new();
// get a handle on `Stdin`
let stdin = std::io::stdin(); // We get `Stdin` here.
loop {
// loop through reading from the buffer...
stdin.read_line(&mut buffer)?;
// and then sending over the channel
line_tx.blocking_send(buffer.clone())?;
// clear the buffer after we've sent the content
buffer.clear();
}
}
Much like we did for the subscribe_loop
, we are going to spawn the input_loop
on a thread. Tokio recommends that we actually spawn the loop listening to Stdin
on a std
thread rather than a tokio
thread:
// after the call to `subscribe_loop`:
// spawn an input thread that reads stdin
// create a multi-provider, single-consumer channel
let (line_tx, mut line_rx) = tokio::sync::mpsc::channel(1);
// and pass the `sender` portion to the `input_loop`
std::thread::spawn(move || input_loop(line_tx));
Okay, so we can read off of stdin. Now we need to broadcast what we are typing to the Topic (and also print our messages to ourselves so that we can read through the chat logically):
// right after the above code:
// broadcast each line we type
println!("> type a message and hit enter to broadcast...");
// listen for lines that we have typed to be sent from `stdin`
while let Some(text) = line_rx.recv().await {
// create a message from the text
let message = Message::Message {
from: endpoint.node_id(),
text: text.clone(),
};
// broadcast the encoded message
sender.broadcast(message.to_vec().into()).await?;
// print to ourselves the text that we sent
println!("> sent: {text}");
}
If you run the code now, you can finally see some signs of life! Type a message and you can see how we take the content from std and send it.
It's getting a bit lonely now, though. Let's implement a way for other's to join our Topic.
Implementing Signaling with Tickets
Let's implement ticket-based signaling! This means we will turn the topic and our node id information into a Ticket
for others to use to join our Topic. We send the ticket by serializing the Ticket
data and printing the serialized data to the terminal. We can then copy/paste for others to use.
// Add the `use` statement to the top of the main file
use iroh::NodeAddr;
use std::fmt;
use std::str::FromStr;
...
// add the `Ticket` code to the bottom of the main file
#[derive(Debug, Serialize, Deserialize)]
struct Ticket {
topic: TopicId,
nodes: Vec<NodeAddr>,
}
impl Ticket {
/// Deserialize from a slice of bytes to a Ticket.
fn from_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes).map_err(Into::into)
}
/// Serialize from a `Ticket` to a `Vec` of bytes.
pub fn to_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).expect("serde_json::to_vec is infallible")
}
}
// The `Display` trait allows us to use the `to_string`
// method on `Ticket`.
impl fmt::Display for Ticket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.to_bytes()[..]);
text.make_ascii_lowercase();
write!(f, "{}", text)
}
}
// The `FromStr` trait allows us to turn a `str` into
// a `Ticket`
impl FromStr for Ticket {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = data_encoding::BASE32_NOPAD.decode(s.to_ascii_uppercase().as_bytes())?;
Self::from_bytes(&bytes)
}
}
Using the Ticket
and Message
s
Let's update our main file to include printing a Ticket
and broadcasting our name on the topic.
// in our main file, after we create a topic `id`:
// print a ticket that includes our own node id and endpoint addresses
let ticket = {
// Get our address information, includes our
// `NodeId`, our `RelayUrl`, and any direct
// addresses.
let me = endpoint.node_addr().await?;
let nodes = vec![me];
Ticket { topic: id, nodes }
};
println!("> ticket to join us: {ticket}");
Now, when you run your code, you should see something like this:
> our node id: 03ce2e2f55af140d0b18395fff054d3f3ab6a30aa680e4a2a3ab4526838151a5
> ticket to join us: pmrhi33qnfrseos3ge4tslbthawdcojvfq3tolbrgq4symjufqytinbmg4wdemzzfqzdinjmgiztmlbshewdonbmgiztglbrgi4cynzzfqzdslbxgqwdsnrmgizdmlbsgazsymjzgewdemzzfqytqmjmgiytalbsguysyobzfqytclbvgiwdenbwfq2cymjygbosyiton5sgk4zchjnxwiton5sgkx3jmqrduirqgnrwkmtfgjtdknlbmyytimdegbrdcobthe2wmztgga2tizbtmyzwcyrwmeztaylbgy4dazjumezgcm3bmi2dkmrwhaztqmjvgfqtkirmejzgk3dbpfpxk4tmei5ce2duoryhgorpf52xgzjrfuys44tfnrqxsltjojxwqltomv2ho33snmxc6irmejsgs4tfmn2f6ylemrzgk43tmvzseos3ei3tilrxgmxdkmroge2dqorvguzdomzcfqrdcojsfyytmobogexdemr2gu2tenzteiwcewzsgyydgorxgaydaoryg4zwcotdmjsdmorrmnsgiorwge2dqorxmnrgcoteha4wgxj2gu2tenzueiwcewzsgyydgorxgaydaoryg4zwcotdmjsdmotggu4teorshfrtaotbgzqtgotbha4tsxj2gu2tenzuejox2xl5
> type a message and hit enter to broadcast...
Creating a Command-Line Interface
Here is where things get fun. We know how to create, join, and send and receive on a Topic
. We also know how to get other to join that Topic
. This has now created two different "roles" a node can have: a topic "creator" and a topic "joiner". One side "creates" the topic and the ticket, and the other side takes the ticket and uses it to join the topic and connect to the ticket creator.
To "join", we will need to pass in a Ticket
as a command line argument. There is a great rust crate called clap
that takes care of much of the CLI boiler plate for you.
Install the clap crate for CLI argument parsing:
cargo add clap --features derive
At this point, rather than show you a chunk of code, I'm going to lay out the entire file.
Here are the bits to pay attention to:
We now create a struct that handles the arguments for the CLI, and a struct that handles the commands for the CLI. We use clap
to create a nice CLI. There are two commands open
and join
. join
expects a `Ticket.
We also now have a --name
flag that we can optionally use as an identifier in the topic.
If you use the open
command, you create a topic. If you use the join
command, you get a topic and a list of node_ids
from the ticket.
In either case, we still print a ticket to the terminal.
The smallest change, but a very important one, is that we go from using the subscribe
method to the subscribe_and_join
method. The subscribe
method would return a Topic
immediately. The subscribe_and_join
method takes the given topic, joins it, and waits for someone else to join the topic before returning.
use std::{collections::HashMap, fmt, str::FromStr};
use anyhow::Result;
use clap::Parser;
use futures_lite::StreamExt;
use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
use iroh_gossip::{
net::{Event, Gossip, GossipEvent, GossipReceiver},
proto::TopicId,
};
use serde::{Deserialize, Serialize};
/// Chat over iroh-gossip
///
/// This broadcasts unsigned messages over iroh-gossip.
///
/// By default a new node id is created when starting the example.
///
/// By default, we use the default n0 discovery services to dial by `NodeId`.
#[derive(Parser, Debug)]
struct Args {
/// Set your nickname.
#[clap(short, long)]
name: Option<String>,
/// Set the bind port for our socket. By default, a random port will be used.
#[clap(short, long, default_value = "0")]
bind_port: u16,
#[clap(subcommand)]
command: Command,
}
#[derive(Parser, Debug)]
enum Command {
/// Open a chat room for a topic and print a ticket for others to join.
Open,
/// Join a chat room from a ticket.
Join {
/// The ticket, as base32 string.
ticket: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// parse the cli command
let (topic, nodes) = match &args.command {
Command::Open => {
let topic = TopicId::from_bytes(rand::random());
println!("> opening chat room for topic {topic}");
(topic, vec![])
}
Command::Join { ticket } => {
let Ticket { topic, nodes } = Ticket::from_str(ticket)?;
println!("> joining chat room for topic {topic}");
(topic, nodes)
}
};
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
println!("> our node id: {}", endpoint.node_id());
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
let router = Router::builder(endpoint.clone())
.accept(iroh_gossip::ALPN, gossip.clone())
.spawn()
.await?;
// in our main file, after we create a topic `id`:
// print a ticket that includes our own node id and endpoint addresses
let ticket = {
// Get our address information, includes our
// `NodeId`, our `RelayUrl`, and any direct
// addresses.
let me = endpoint.node_addr().await?;
let nodes = vec![me];
Ticket { topic, nodes }
};
println!("> ticket to join us: {ticket}");
// join the gossip topic by connecting to known nodes, if any
let node_ids = nodes.iter().map(|p| p.node_id).collect();
if nodes.is_empty() {
println!("> waiting for nodes to join us...");
} else {
println!("> trying to connect to {} nodes...", nodes.len());
// add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed
for node in nodes.into_iter() {
endpoint.add_node_addr(node)?;
}
};
let (sender, receiver) = gossip.subscribe_and_join(topic, node_ids).await?.split();
println!("> connected!");
// broadcast our name, if set
if let Some(name) = args.name {
let message = Message::AboutMe {
from: endpoint.node_id(),
name,
};
sender.broadcast(message.to_vec().into()).await?;
}
// subscribe and print loop
tokio::spawn(subscribe_loop(receiver));
// spawn an input thread that reads stdin
// create a multi-provider, single-consumer channel
let (line_tx, mut line_rx) = tokio::sync::mpsc::channel(1);
// and pass the `sender` portion to the `input_loop`
std::thread::spawn(move || input_loop(line_tx));
// broadcast each line we type
println!("> type a message and hit enter to broadcast...");
// listen for lines that we have typed to be sent from `stdin`
while let Some(text) = line_rx.recv().await {
// create a message from the text
let message = Message::Message {
from: endpoint.node_id(),
text: text.clone(),
};
// broadcast the encoded message
sender.broadcast(message.to_vec().into()).await?;
// print to ourselves the text that we sent
println!("> sent: {text}");
}
router.shutdown().await?;
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
enum Message {
AboutMe { from: NodeId, name: String },
Message { from: NodeId, text: String },
}
impl Message {
fn from_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes).map_err(Into::into)
}
pub fn to_vec(&self) -> Vec<u8> {
serde_json::to_vec(self).expect("serde_json::to_vec is infallible")
}
}
// Handle incoming events
async fn subscribe_loop(mut receiver: GossipReceiver) -> Result<()> {
// keep track of the mapping between `NodeId`s and names
let mut names = HashMap::new();
// iterate over all events
while let Some(event) = receiver.try_next().await? {
// if the Event is a `GossipEvent::Received`, let's deserialize the message:
if let Event::Gossip(GossipEvent::Received(msg)) = event {
// deserialize the message and match on the
// message type:
match Message::from_bytes(&msg.content)? {
Message::AboutMe { from, name } => {
// if it's an `AboutMe` message
// add and entry into the map
// and print the name
names.insert(from, name.clone());
println!("> {} is now known as {}", from.fmt_short(), name);
}
Message::Message { from, text } => {
// if it's a `Message` message,
// get the name from the map
// and print the message
let name = names
.get(&from)
.map_or_else(|| from.fmt_short(), String::to_string);
println!("{}: {}", name, text);
}
}
}
}
Ok(())
}
fn input_loop(line_tx: tokio::sync::mpsc::Sender<String>) -> Result<()> {
let mut buffer = String::new();
let stdin = std::io::stdin(); // We get `Stdin` here.
loop {
stdin.read_line(&mut buffer)?;
line_tx.blocking_send(buffer.clone())?;
buffer.clear();
}
}
// add the `Ticket` code to the bottom of the main file
#[derive(Debug, Serialize, Deserialize)]
struct Ticket {
topic: TopicId,
nodes: Vec<NodeAddr>,
}
impl Ticket {
/// Deserialize from a slice of bytes to a Ticket.
fn from_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes).map_err(Into::into)
}
/// Serialize from a `Ticket` to a `Vec` of bytes.
pub fn to_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).expect("serde_json::to_vec is infallible")
}
}
// The `Display` trait allows us to use the `to_string`
// method on `Ticket`.
impl fmt::Display for Ticket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.to_bytes()[..]);
text.make_ascii_lowercase();
write!(f, "{}", text)
}
}
// The `FromStr` trait allows us to turn a `str` into
// a `Ticket`
impl FromStr for Ticket {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = data_encoding::BASE32_NOPAD.decode(s.to_ascii_uppercase().as_bytes())?;
Self::from_bytes(&bytes)
}
}
Running the Application
cargo run -- --name user1 open
To join an existing chat room:
cargo run -- --name user2 join <ticket>
The application will now support basic chat functionality between connected peers, with messages broadcast to all participants in the room.
Notes on Security
While this implementation demonstrates the basic concepts, a production system would need additional security measures. For example, the example in the Iroh gossip protocol repository includes message signing to prevent impersonation attacks.
For more sophisticated implementations and security features, refer to the examples in the Iroh gossip protocol repository.