Tokio: Macros
join
#
tokio::join!
lets you run multiple futures concurrently, and returns the output of all of them. For JavaScript developers: Promise.all
is a good equivalent.
Here’s an example where I wait for multiple URLs to finish.
let (resp_1, resp_2) = tokio::join!(
reqwest::get("https://oida.dev"),
reqwest::get("https://oida.dev/slides")
);
There’s one caveat tough that you need to be aware of: Those futures run on the same thread concurrently, but not in parallel. If you want to run them in parallel, use tokio::spawn
to spawn a new task for each future. tokio::spawn
returns a JoinHandle
, which implements also Future
, meaning that you can join on it.
async fn fetch(url: &str) -> reqwest::Result<String> {
reqwest::get(url).await?.text().await
}
let (body_1, body_2) = tokio::join!(
task::spawn(fetch("https://oida.dev")),
task::spawn(fetch("https://oida.dev/slides"))
);
There is also try_join
, which returns a Result
instead of a tuple. If one of the futures fails, the whole try_join
fails.
select
#
You’re going to love/hate the select
macro. First and foremost, it’s really cool and handy for a plethora of situations, but its syntax can be confusing, and your Rust formatter won’t be happy with it. That aside, it’s really helpful and sometimes also necessary.
My feeling is that it has been inspired by how Go’s select statement works when working with go-routines and reading from channels, but I might be wrong.
Anyway, the select
macro allows you to run multiple futures concurrently, wait on their result, and then act on the first one that completes. If you know JavaScript, it’s very similar to Promise.race
.
The syntax can be a bit weird. There are no .await
calls and no let
assignments. The snytax to write one of the select branches is pattern = future/async expression => handler
. Think of transforming an if let
call like this:
if let Ok(resp) = reqwest::get("https://oida.dev").await {
// tbd
}
into this:
Ok(resp) = reqwest::get("https://oida.dev") => {
// tbd
}
Let’s look at an example where I fetch data from my website and try to finish the request in less than 250 milliseconds.
use std::time::Duration;
use tokio::time;
#[tokio::main]
async fn main() {
// (1)
tokio::select! {
// (2)
Ok(resp) = reqwest::get("https://oida.dev") => {
println!("{:?}", resp.text().await.unwrap());
}
// (3)
_ = time::sleep(Duration::from_millis(250)) => {
println!("Timeout");
}
// (4)
else => {
println!("Something else happened");
}
}
}
Here’s whats going on.
- This is the macro. It is followed by a block with the futures you race against each other.
- The first future is returned from the
reqwest::get
call. We make a call to my website. If this future finishes first, we print the response. - The second future is a
time::sleep
call from thetokio
crate. If this future finishes first, we print “Timeout”. - Since we destructured the
Ok
variant of the first future, we don’t handle the case where thereqwest::get
call errors. For that, we can use theelse
block that takes care of all other cases.
This is tokio::select
in a nutshell and the basic syntax. There are more nuances to it that are well explained in the official doc, but from my experience, you will be good with what you see above.
tokio::select
is a good alternative if you need to race multiple futures against each other and spawning tasks / waiting for their result is too much. You will see in the exercise below that it can greatly reduce code complexity.
pin
#
Explaining the pin
macro without explaining what pinning means in the context of async Rust won’t work. I try to explain it very briefly: Before a future starts executing, we need to be able to move it around in memory. This is especially true when using Tokio, since a future might be executed on a different worker thread than the one it came from.
Once a future startse executing, it must not be moved around in memory anymore. This is where pinning comes into play. It tells the compiler that everything it needs to execute the future is in place and that it can’t be moved around anymore. If you want to know more about it, please check out Adam Chalmer’s excellent article on that topic.
The pin!
macro allows you to pin a future on the stack. You will see Box::pin
more often in the wild, pin!
is an alternative to that.
Exercise 3: Rewrite the Chat Server with Macros #
Let’s refactor our chat server to work properly with Tokio’s select!
macro:
- Extract the loop into its own function
- Use proper error handling instead of
unwrap()
- Let both sides of the chat run concurrently in a
select
block
Solution #
Please find the solution below. To make the code even terser, I renamed the buf
variable to content
, the addr
will be passed as id
. I will only focus on the run
method, you can spawn run
as a task:
tokio::spawn(run(socket, addr, tx, rx));
And here’s the solution:
// (1)
async fn run(
socket: TcpStream,
id: SocketAddr,
tx: broadcast::Sender<Message>,
mut rx: broadcast::Receiver<Message>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let (reader, mut writer) = socket.into_split();
let mut reader = BufReader::new(reader);
loop {
let mut content = String::new();
tokio::select! {
// (2)
Ok(msg) = rx.recv() => {
if msg.id != id {
writer.write_all(msg.content.as_bytes()).await?;
}
}
// (3)
Ok(_b_read) = reader.read_line(&mut content) => {
if content.trim() == "quit" {
break Ok(());
}
tx.send(Message { content, id })?;
}
}
}
}
- We extract the main loop into a separate function. The function takes the socket, the socket’s address as ID, the broadcast sender, and the brodcast receiver. The function returns a
Result
with an error type that implementsError
,Send
, andSync
. We want to bubble up errors to the caller, and since we run in an async context, the errors itself need to be thread-safe. - In the
select
block we run two futures concurrently. The first future listens for incoming messages from the broadcast receiver. If a message is received, we check if the message is not from the current client and write the message to the writer. - The second future listens for incoming messages from the client. If a message is received, we check if the message is “quit”. If it is, we break the loop and return
Ok(())
. Otherwise, we send the message to the broadcast sender.
And that’s it!