Introduction
As everyone knows, Rust recently stabilized the async/await
feature. My first attempt to
convert a little program I had to use it was a dismal failure, (reasons are at the bottom
of this post), so I thought I would step back and write some simple - and I do mean very simple -
examples of how to use await. The final step in this post shows how to download multiple URLs,
in parallel, which was the business problem I was trying to solve in the first place.
I am definitely a learner when it comes to this material, so there may well be better ways to accomplish some of this - if so, please write a comment and I’ll update the post.
The first thing to note is that there are (at least) two competing async eco-systems at the
moment, Tokio
and async-std. It appears that you can’t mix-and-match
things from one eco-system to the other. I chose to get started with async-std
for no other reason that this video
helped clear up some of the misconceptions I had.
I’ll be including full source code for each example, including Cargo.toml
and use
declarations, and the full source code is in this git repo.
Step 1
The git repo I linked above has a set of tags for each step in this post. You can check out
each tag and run it. In the first step, I add all the dependencies I need for this entire
blog post to Cargo.toml
:
[dependencies]
futures = "0.3.1"
rand = "0.7"
async-std = { version = "1.2", features = ["attributes"] }
surf = "1.0"
Some people refer to the futures library as futures-rs
- I don’t know why, but when you
hear them do so, it’s futures
that you want. We’ll use
rand to generate some random
sleep times, and surf to download some web content.
In main.rs
let’s add all the use
statements we will need (for now and later)
and let’s adjust the main
function to print out how long the program ran for. This
will be a useful sanity-check later on.
use async_std::task;
use futures::join;
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::{Duration, Instant};
use std::thread;
use rand::distributions::{Distribution, Uniform};
fn main() {
let start_time = Instant::now();
println!("Program finished in {} ms", start_time.elapsed().as_millis());
}
That’s it, now we’re all set to start doing something useful.
Step 2
Let’s start by creating two futures and waiting for them to complete. This is just about the simplest thing we can do. It’s important that we explicitly wait - unlike languages such as C# which have a background runtime to progress tassks, Rust futures are lazy and do nothing unless polled by a runtime.
In order to exclude unnecessary complications we are going to use the simplest possible ‘work’ for our future to do - just sleep for a bit. Because it’s so simple and doesn’t perform any sort of IO we also don’t have to worry about errors yet.
First the code:
fn main() {
let start_time = Instant::now();
demo_waiting_for_two_async_fns();
println!("Program finished in {} ms", start_time.elapsed().as_millis());
}
fn demo_waiting_for_two_async_fns() {
// block_on takes a future and waits for it to complete.
// Notice that this fn is not `async`, and we are not using
// an async block either (because we are not calling `await`).
task::block_on(call_both_sleepers());
}
async fn call_both_sleepers() {
join!(first_sleeper(), second_sleeper());
}
async fn first_sleeper() {
sleep_and_print(1, 1000).await;
}
async fn second_sleeper() {
sleep_and_print(1, 1500).await;
}
/// This utility function simply goes to sleep for a specified time
/// and then prints a message when it is done.
async fn sleep_and_print(future_number: u32, sleep_millis: u64) {
let sleep_duration = Duration::from_millis(sleep_millis);
// Note we are using async-std's `task::sleep` here, not
// thread::sleep. We must not block the thread!
task::sleep(sleep_duration).await;
println!("Future {} slept for {} ms on {:?}", future_number, sleep_millis, thread::current().id());
}
If you run this code you should see
Future 1 slept for 1000 ms on ThreadId(1)
Future 2 slept for 1500 ms on ThreadId(1)
Program finished in 1500 ms
Let’s disect it. The function sleep_and_print
is the lowest-level one here,
and it’s the one that does the actual sleeping for us. When it’s done it simply
prints a message saying so. The future_number
is just so that we can discriminate
between the invocations. Note the use of
task::sleep
from async-std
.
It’s this that means that the thread is not blocked while the function is sleeping,
and hence that the entire program completes in 1500 ms, which is what we want.
If you change to the blocking
thread::sleep
you will get something like this:
Future 1 slept for 1000 ms on ThreadId(1)
Future 2 slept for 1500 ms on ThreadId(1)
Program finished in 2500 ms
The program now takes 2500 ms because the second future was blocked by the first
one calling thread::sleep
. In the original program, the second future was able to start
executing ‘in parallel’ because it was not blocked. This brings out an important point -
don’t call blocking APIs from inside async functions! If you do you will block the
tasks on the thread from making progress.
The functions first_sleeper
, second_sleeper
create futures that wait for different
times. Then in call_both_sleepers
we use the join!
macro from the futures
crate
to join them together. Finally we pass call_both_sleepers
- which is a future - into
task::block_on
, which runs the tasks to completion.
Frankly I was surprised how much boilerplate was required to get this to work, there’s probably an easier way. But I did get done what I wanted to do - I ran 2 futures in parallel: the time to complete was the time of the longest duration future, not the sum of their individual times.
Step 3
In step 3 we generalize from step 2: instead of trying to run 2 futures in parallel, we
want to run N
. The easiest way to do this seems to be to use the
FuturesUnordered
collection type from the futures
crate.
Update 14 Dec 2019: Actually, an easier way is to spawn individual tasks, a technique that also means we benefit from running on all available cores. See Step 7 of this post.
Let’s add a new function:
fn demo_waiting_for_multiple_random_sleeps() {
// Initialise the random number generator we will use to
// generate the random sleep times.
let between = Uniform::from(500..10_000);
let mut rng = rand::thread_rng();
// This special collection from the `futures` crate is what we use to
// hold all the futures; it is designed to efficiently poll the futures
// until they all complete, (in any order) which we do with a simple
// loop (see below).
let mut futures = FuturesUnordered::new();
// Create 10 futures, each of which should sleep for a random
// number of milliseconds. None of the futures are doing anything
// yet, because we are only storing them; we haven't started polling
// them yet.
for future_number in 0..10 {
let sleep_millis = between.sample(&mut rng);
futures.push(sleep_and_print(future_number, sleep_millis));
}
// This loop is how to wait for all the elements in a `FuturesUnordered<T>`
// to complete. `value_returned_from_the_future` is just the
// unit tuple, `()`, because we did not return anything from `sleep_and_print`.
task::block_on(async {
while let Some(_value_returned_from_the_future) = futures.next().await {
}
});
}
If we call this from main
, we get something like this (results will vary due
to the use of the random number generator):
Future 6 slept for 726 ms on ThreadId(1)
Future 9 slept for 1233 ms on ThreadId(1)
Future 4 slept for 2013 ms on ThreadId(1)
Future 0 slept for 2056 ms on ThreadId(1)
Future 3 slept for 3072 ms on ThreadId(1)
Future 7 slept for 5316 ms on ThreadId(1)
Future 2 slept for 6328 ms on ThreadId(1)
Future 8 slept for 6374 ms on ThreadId(1)
Future 5 slept for 6725 ms on ThreadId(1)
Future 1 slept for 7936 ms on ThreadId(1)
Program finished in 7936 ms
This is exactly the behaviour I was looking for - it looks like we have multiple
futures executing, and the overall time is equal to the longest sleep time. This also
shows that the overhead of using async/await
is very small (less than 1 ms).
Step 4
So far our futures have not returned any values. In this step, we build on the code
introduced in step 3 by making one simple change - we introduce a new version of
sleep_and_print
which returns a value to the caller.
async fn sleep_and_print_and_return_value(future_number: u32, sleep_millis: u64) -> u32 {
let sleep_duration = Duration::from_millis(sleep_millis);
task::sleep(sleep_duration).await;
println!("Future {} slept for {} ms on thread {:?}", future_number, sleep_millis, thread::current().id());
future_number * 10
}
fn demo_waiting_for_multiple_random_sleeps_with_return_values() {
let between = Uniform::from(500..10_000);
let mut rng = rand::thread_rng();
let mut cf = FuturesUnordered::new();
for future_number in 0..10 {
let random_millis = between.sample(&mut rng);
cf.push(sleep_and_print_and_return_value(future_number, random_millis));
}
// The async block borrows a mutable reference to `sum`, allowing us to
// add up all the values returned from the future.
let mut sum = 0;
task::block_on(async {
while let Some(value_returned_from_the_future) = cf.next().await {
sum += value_returned_from_the_future;
}
});
println!("Sum of all values returned = {}", sum);
}
When we run this we get this output:
Future 9 slept for 1095 ms on thread ThreadId(1)
Future 1 slept for 1378 ms on thread ThreadId(1)
Future 6 slept for 3577 ms on thread ThreadId(1)
Future 3 slept for 3874 ms on thread ThreadId(1)
Future 8 slept for 4946 ms on thread ThreadId(1)
Future 5 slept for 7034 ms on thread ThreadId(1)
Future 0 slept for 7172 ms on thread ThreadId(1)
Future 7 slept for 7511 ms on thread ThreadId(1)
Future 4 slept for 8079 ms on thread ThreadId(1)
Future 2 slept for 9114 ms on thread ThreadId(1)
Sum of all values returned = 450
Program finished in 9114 ms
Step 5
Step 5 is a simple extension of step 4 - instead of returning a u32
we return
a Result<T,E>
- many futures will do this, so we need to know how to handle them.
async fn sleep_and_print_and_return_error(future_number: u32, sleep_millis: u64) -> Result<u32, String> {
let sleep_duration = Duration::from_millis(sleep_millis);
task::sleep(sleep_duration).await;
println!("Future {} slept for {} ms on thread {:?}", future_number, sleep_millis, thread::current().id());
if future_number % 2 == 0 {
Ok(future_number * 10)
} else {
Err(format!("It didn't work for future {}", future_number))
}
}
fn demo_waiting_for_multiple_random_sleeps_with_errors() {
let between = Uniform::from(500..10_000);
let mut rng = rand::thread_rng();
let mut futures = FuturesUnordered::new();
for future_number in 0..10 {
let random_millis = between.sample(&mut rng);
futures.push(sleep_and_print_and_return_error(future_number, random_millis));
}
// Now, `value_returned_from_the_future` is a `Result<u32, String>` so
// we must take care to pattern match on it.
let mut sum = 0;
task::block_on(async {
while let Some(value_returned_from_the_future) = futures.next().await {
match value_returned_from_the_future {
Ok(value) => sum += value,
Err(e) => println!(" Got error back: {}", e),
}
}
});
println!("Sum of all values returned = {}", sum);
}
When we run this we get this output:
Future 7 slept for 1445 ms on thread ThreadId(1)
Got error back: It didn't work for future 7
Future 4 slept for 1561 ms on thread ThreadId(1)
Future 3 slept for 1583 ms on thread ThreadId(1)
Got error back: It didn't work for future 3
Future 1 slept for 2048 ms on thread ThreadId(1)
Got error back: It didn't work for future 1
Future 6 slept for 4138 ms on thread ThreadId(1)
Future 0 slept for 4886 ms on thread ThreadId(1)
Future 9 slept for 5718 ms on thread ThreadId(1)
Got error back: It didn't work for future 9
Future 2 slept for 6562 ms on thread ThreadId(1)
Future 8 slept for 7209 ms on thread ThreadId(1)
Future 5 slept for 9591 ms on thread ThreadId(1)
Got error back: It didn't work for future 5
Sum of all values returned = 200
Program finished in 9591 ms
Step 6
Step 6 is the final step. In the previous steps I demonstrated how to run multiple futures and await all their results, handling errors when required. Now we can use those techniques to do some useful work. Let’s try and download a bunch of URLs in parallel.
The code is surprisingly simple. We bring in the
surf
library to handle the HTTP GETs, and there is one other innovation, we
use collect
to build our set of futures but we could just as easily have
iterated over the urls collection and pushed them one by one into the
futures
collection.
async fn download_url(url: &str) -> Result<String, surf::Exception> {
println!("Downloading {} on thread {:?}", url, thread::current().id());
// Code taken directly from the example for `surf`.
let mut result = surf::get(url).await?;
let body = result.body_string().await?;
println!(" Downloaded {}, returning body of length {} ", url, body.len());
Ok(body)
}
fn demo_downloading_urls() {
let urls = vec![
"https://www.sharecast.com/equity/Anglo_American",
"https://www.sharecast.com/equity/Associated_British_Foods",
"https://www.sharecast.com/equity/Admiral_Group",
"https://www.sharecast.com/equity/Aberdeen_Asset_Management",
"https://www.sharecast.com/equity/Aggreko",
"https://www.sharecast.com/equity/Ashtead_Group",
"https://www.sharecast.com/equity/Antofagasta",
"https://www.sharecast.com/equity/Aviva",
"https://www.sharecast.com/equity/AstraZeneca",
"https://www.sharecast.com/equity/BAE_Systems",
"https://www.sharecast.com/equity/Babcock_International_Group",
"https://www.sharecast.com/equity/British_American_Tobacco",
"https://www.sharecast.com/equity/Balfour_Beatty",
"https://www.sharecast.com/equity/Barratt_Developments",
"https://www.sharecast.com/equity/BG_Group",
"https://www.sharecast.com/equity/British_Land_Company",
"https://www.sharecast.com/equity/BHP_Group",
"https://www.sharecast.com/equity/Bunzl",
"https://www.sharecast.com/equity/BP",
"https://www.sharecast.com/equity/Burberry_Group",
"https://www.sharecast.com/equity/BT_Group",
];
// This time let's make our FuturesUnordered value by collecting
// a set of futures.
let mut cf = urls.iter()
.map(|url| download_url(url))
.collect::<FuturesUnordered<_>>();
task::block_on(async {
while let Some(return_val) = cf.next().await {
match return_val {
Ok(body) => {
// Possibly do something useful with the body of the request here.
},
Err(e) => println!(" Got error {:?}", e),
}
}
});
}
When we run this program, we get something like this:
Downloading https://www.sharecast.com/equity/Anglo_American on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Associated_British_Foods on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Admiral_Group on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Aberdeen_Asset_Management on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Aggreko on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Ashtead_Group on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Antofagasta on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Aviva on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/AstraZeneca on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/BAE_Systems on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Babcock_International_Group on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/British_American_Tobacco on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Balfour_Beatty on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Barratt_Developments on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/BG_Group on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/British_Land_Company on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/BHP_Group on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Bunzl on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/BP on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/Burberry_Group on thread ThreadId(1)
Downloading https://www.sharecast.com/equity/BT_Group on thread ThreadId(1)
Downloaded https://www.sharecast.com/equity/BT_Group, returning body of length 83555
Downloaded https://www.sharecast.com/equity/BP, returning body of length 82642
Downloaded https://www.sharecast.com/equity/Aviva, returning body of length 82924
Downloaded https://www.sharecast.com/equity/Anglo_American, returning body of length 83110
Downloaded https://www.sharecast.com/equity/Antofagasta, returning body of length 82406
Downloaded https://www.sharecast.com/equity/Bunzl, returning body of length 81577
Downloaded https://www.sharecast.com/equity/BHP_Group, returning body of length 82203
Downloaded https://www.sharecast.com/equity/Admiral_Group, returning body of length 82780
Downloaded https://www.sharecast.com/equity/BAE_Systems, returning body of length 83328
Downloaded https://www.sharecast.com/equity/Barratt_Developments, returning body of length 83751
Downloaded https://www.sharecast.com/equity/Balfour_Beatty, returning body of length 83645
Downloaded https://www.sharecast.com/equity/Ashtead_Group, returning body of length 83060
Downloaded https://www.sharecast.com/equity/Burberry_Group, returning body of length 83147
Downloaded https://www.sharecast.com/equity/Aggreko, returning body of length 81857
Downloaded https://www.sharecast.com/equity/British_American_Tobacco, returning body of length 84589
Downloaded https://www.sharecast.com/equity/British_Land_Company, returning body of length 83755
Downloaded https://www.sharecast.com/equity/AstraZeneca, returning body of length 83023
Downloaded https://www.sharecast.com/equity/Aberdeen_Asset_Management, returning body of length 28780
Downloaded https://www.sharecast.com/equity/Associated_British_Foods, returning body of length 85075
Downloaded https://www.sharecast.com/equity/Babcock_International_Group, returning body of length 87022
Downloaded https://www.sharecast.com/equity/BG_Group, returning body of length 73655
Program finished in 6622 ms
The program finishes a lot faster compared to making the requests in series.
And that’s it! I hope this proves useful to others; comments and improvements are welcome.
Step 7
A couple of weeks after writing the first version of this post I figured out a way
to run multiple futures on multiple cores. We don’t need FuturesUnordered
, we can
just spawn as many tasks as we need then wait for them all to complete. async-std's
executor will distribute the tasks across all available cores. Here is a simple
example, based on downloading URLs again:
fn demo_downloading_urls_on_multiple_threads() {
let mut tasks = Vec::with_capacity(URLS.len());
for url in URLS.iter() {
let url = url.to_string();
tasks.push(task::spawn(async move {
match download_url(&url).await {
Ok(body) => { // Possibly do something useful with the body of the request here.
},
Err(e) => println!(" Got error {:?}", e),
}
}))
}
task::block_on(async {
for t in tasks {
t.await;
}
});
}
When we run this program we get something like this (note the different ThreadIds):
Downloading https://www.sharecast.com/equity/Anglo_American on thread ThreadId(9)
Downloading https://www.sharecast.com/equity/Associated_British_Foods on thread ThreadId(4)
Downloading https://www.sharecast.com/equity/Admiral_Group on thread ThreadId(7)
Downloading https://www.sharecast.com/equity/Ashtead_Group on thread ThreadId(5)
Downloading https://www.sharecast.com/equity/Aviva on thread ThreadId(6)
Downloading https://www.sharecast.com/equity/BAE_Systems on thread ThreadId(2)
Downloading https://www.sharecast.com/equity/Barratt_Developments on thread ThreadId(3)
Downloading https://www.sharecast.com/equity/Aggreko on thread ThreadId(8)
Downloading https://www.sharecast.com/equity/Babcock_International_Group on thread ThreadId(2)
Downloading https://www.sharecast.com/equity/BP on thread ThreadId(4)
...
The total execution time is similar to the single-threaded version, but that’s only because we don’t have a lot of work to do. In a heavily loaded server this technique is obviously preferable.
Gotchas
I could not fix the program I was originally trying to asyncify (it’s not listed here).
The problem was that it was using
reqwest
rather than
surf. When I tried to use reqwest::get
I got
an error Error(Connect, Custom { kind: Other, error: "no current reactor" })
.
The author notes that reqwest
is undergoing development which will mean
breaking changes, but I don’t know if that means this error will go away.
It would be unfortunate (to say the least) if Rust libraries only work on certain async runtimes - not to mention hugely confusing for people trying to learn the ropes. But things are still very new, I think it is going to take several months for things to begin to settle down, documentation to be updated etc.