The async/await feature in Rust is implemented using a mechanism known as cooperative scheduling, such as the one implemented by Tokio Runtime. This choice has some important consequences for people who write asynchronous Rust code, in particular async code should never spend long time without reaching an await
Warning
A naive way to write an application that works on many things is to spawn a new thread for every task, but with large number of tasks the overhead is significant
Tip
The alternative is to implement something that uses fewer threads and swap the task running on each thread quickly. In rust this happen when invoking
await
The Future trait
The Future trait is polled by a runtime and receives a mutable reference to self and a Context, which can be used to wake up the future when it’s ready to continue execution.
#[derive(Default)]
struct Unit {
/// The 1-D position of the unit. In a real game, it would be 2D or 3D.
pub pos: i32,
}
type UnitRef = Rc<RefCell<Unit>>;
/// A future that will move the unit towards `target_pos` at each step,
/// and complete when the unit has reached that position.
struct UnitGotoFuture {
unit: UnitRef,
target_pos: i32,
}
impl Future for UnitGotoFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
// immutable borrow
let unit_pos = self.unit.borrow().pos;
if unit_pos == self.target_pos {
Poll::Ready(())
} else {
// Mutable borrow
self.unit.borrow_mut().pos += (self.target_pos - unit_pos).signum();
Poll::Pending
}
}
}Under the hood, async methods get converted into a state machine implemented via a closure: the compiler generates a closure to implement the state machine and then wraps in a struct that implements the Future trait. The Future trait’s poll method is implemented by calling the closure. The Future struct is then returned by the goto function.
Waking up
It is up to the future to signal through the context when it is ready to run. This is done through the Waker, that expose two options:
- a
wakethat consumes itself - a
wake_by_refthat clones itself
An example for using wake is a timer, where we know that as soon as the task it is runnable because the timer has elapsed, it can complete. An example of wake_by_ref is some form of socket
What blocking the thread means?
When writing async Rust, the phrase “blocking the thread” means “preventing the runtime from swapping the current task”. This can be a major issue because it means that other tasks on the same runtime will stop running until the thread is no longer being blocked. To prevent this, we should write code that can be swapped quickly, which you do by never spending a long time away from an .await
For example the following code:
use std::time::Duration;
async fn sleep_then_print(timer: i32) {
println!("Start timer {}.", timer);
// No .await here!
std::thread::sleep(Duration::from_secs(1));
println!("Timer {} done.", timer);
}
#[tokio::main]
async fn main() {
// The join! macro lets you run multiple things concurrently.
tokio::join!(
sleep_then_print(1),
sleep_then_print(2),
sleep_then_print(3),
);
}Produces this output
Start timer 1.
Timer 1 done.
Start timer 2.
Timer 2 done.
Start timer 3.
Timer 3 done.
The lack of await means the runtime, in this case tokio, cannot swap the task. We should change our code to use an awaitable function like so:
async fn sleep_then_print(timer: i32) {
println!("Start timer {}.", timer);
tokio::time::sleep(Duration::from_secs(1)).await;
// ^ execution can be paused here
println!("Timer {} done.", timer);
}tokio::join and tokio::spawn
tokio::join! macro execute tasks guaranteeing that will be using the same thread to execute them. It is effectively possible to execute in parallel blocking tasks using tokio::spawn and a multi-threaded runtime: the runtime will spawn CPU threads and you would be able to assign a blocking task to a thread in the thread-pool.
Warning
The default tokio runtime uses a thread pool capped to one thread per CPU core, tso locally you might not see the problem
How to handle blocking correctly
If we are dealing with expensive CPU-bound computation or synchronous IO, we might not be capable of calling .await every 10-100 microseconds, so we need to move the blocking operation outside the thread pool. We have three options:
- Use
tokio::task::spawn_blocking - Use the
rayoncrate - Spawn a dedicated thread withh
std::thread::spawn
spawn_blocking uses a separate thread pool for running blocking functions, with an upper limit of about 500 threads. Because the thread pool is large, it is best suited for blocking IO such as interacting with the filesystem or with a crate like diesel that provides blocking database interactions.
Tip
The large thread pool works well because I/O tasks spend most of the time waiting for external operations to complete, and I/O operations have high latency
For CPU-bound computations it is better to use the rayon crate that provides a thread-pool with a smaller number of threads, since CPU bound tasks fully utilize the CPU and are active for most of their lifecycle, so using many threads competing for CPU time results will incur in significant context-switching overhead
A typical strategy is to combine with Tokio, using channels to await on tokio, while having the more complex operation computed on a thread scheduled with rayon::spawn.
async fn parallel_sum(nums: Vec<i32>) -> i32 {
let (send, recv) = tokio::sync::oneshot::channel();
// Spawn a task on rayon.
rayon::spawn(move || {
let mut sum = 0;
for num in nums {
sum += num;
}
// Send the result back to Tokio.
let _ = send.send(sum);
});
// Wait for the rayon task.
recv.await.expect("Panic in rayon::spawn")
}
#[tokio::main]
async fn main() {
let nums = vec![1; 1024 * 1024];
println!("{}", parallel_sum(nums).await);
}This approach uses one thread in rayon thread-pool per call. If there are many calls, one could also use rayon parallel iterators to ensure that each invocation uses multiple threads in the pool
use rayon::prelude::*;
// Spawn a task on rayon.
rayon::spawn(move || {
// Compute the sum on multiple threads.
let sum = nums.par_iter().sum();
// Send the result back to Tokio.
let _ = send.send(sum);
})The last possible solution is to spawn a dedicated thread using std::thread::spawn, for operations that keep running forever: running those operations with rayon would effectively remove one thread from the thread pool
Comparing Rayon, CrossBeam and Tokio
| Use Case | Crossbeam | Tokio | Rayon |
|---|---|---|---|
| Borrowed data across threads | ✅ Scoped threads ensure safe borrowing. | ❌ Requires 'static lifetimes. | ❌ Requires owned data (Send + 'static). |
| Fine-grained thread control | ✅ Full control over thread lifecycle and task prioritization. | ❌ Abstracted behind async runtime. | ❌ No direct thread control; uses thread pools. |
| Blocking, CPU-bound workloads | ✅ Directly manages threads; low-latency. | ❌ Offload to spawn_blocking. | ✅ Optimized for CPU-bound, data-parallel tasks. |
| Producer-consumer pipelines | ✅ Simpler with blocking channels. | ✅ Use tokio::sync::mpsc for async tasks. | ❌ Not designed for producer-consumer workflows. |
| Async I/O (e.g., GitHub API calls) | ❌ Not designed for async tasks. | ✅ Built for async workflows. | ❌ Cannot handle async tasks. |
| Lightweight task concurrency | ❌ Requires explicit thread management. | ✅ Highly efficient with async tasks. | ❌ Limited to parallel data transformations. |
| Data-parallel transformations | ❌ Threads must be manually managed. | ❌ Not suited for parallel data processing. | ✅ Ideal for parallel processing with .par_iter(). |
| Recursive parallelism | ❌ No built-in support. | ❌ Not designed for recursive workloads. | ✅ Divide-and-conquer parallelism is natural. |
| Task prioritization | ✅ Supports custom priority queues with channels. | ❌ Requires complex setup. | ❌ No support for task prioritization. |
| Integration simplicity | ✅ Minimal setup, works out-of-the-box with threads. | ✅ Easy to integrate for async tasks. | ✅ Straightforward for parallel iteration. |
An example that uses the three of them
The project fetches data about GitHub repositories (e.g., issues, pull requests) using Tokio for async I/O, processes the fetched data in parallel using Rayon, and coordinates tasks with a producer-consumer pipeline using Crossbeam.
Here’s how you can structure this:
use crossbeam::channel;
use rayon::prelude::*;
use tokio::{task, time::{sleep, Duration}};
use std::thread;
// Simulated async function to fetch GitHub issues for a repository
async fn fetch_issues(repo: &str) -> Vec<String> {
println!("Fetching issues for repo: {}", repo);
sleep(Duration::from_secs(1)).await; // Simulate network delay
vec![format!("Issue 1 in {}", repo), format!("Issue 2 in {}", repo)]
}
// Simulated async function to fetch pull requests for a repository
async fn fetch_pull_requests(repo: &str) -> Vec<String> {
println!("Fetching pull requests for repo: {}", repo);
sleep(Duration::from_secs(1)).await; // Simulate network delay
vec![format!("PR 1 in {}", repo), format!("PR 2 in {}", repo)]
}
// CPU-bound function to process data (e.g., filtering or analyzing)
fn process_data(data: Vec<String>) -> Vec<String> {
data.par_iter() // Parallel processing using Rayon
.map(|item| {
println!("Processing {}", item);
format!("Processed: {}", item)
})
.collect()
}
// Main workflow: Producer generates tasks, consumers fetch/process data
fn run_pipeline(repos: Vec<String>) {
// Crossbeam channel for task queue
let (task_sender, task_receiver) = channel::unbounded();
let (result_sender, result_receiver) = channel::unbounded();
// Producer thread: Generate tasks for each repository
let producer = thread::spawn(move || {
for repo in repos {
println!("Queueing tasks for repo: {}", repo);
task_sender.send(repo).unwrap(); // Send repo names to the queue
}
});
// Consumer threads: Fetch data and process it
let consumers: Vec<_> = (0..2) // Two consumer threads
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
thread::spawn(move || {
while let Ok(repo) = task_receiver.recv() {
// Use block_on to fetch issues and pull requests in this thread
let runtime = tokio::runtime::Runtime::new::unwrap();
let issues = task::block_in_place(|| runtime.block_on(fetch_issues(&repo)));
let pull_requests = task::block_in_place(|| runtime.block_on(fetch_pull_requests(&repo)));
// Combine and process data
let combined_data = [issues, pull_requests].concat();
let processed_data = process_data(combined_data);
// Send processed data to results channel
result_sender.send(processed_data).unwrap();
}
})
})
.collect();
producer.join().unwrap(); // Wait for producer to finish
drop(task_sender); // Close task queue to signal consumers
for consumer in consumers {
consumer.join().unwrap(); // Wait for all consumers to finish
}
// Collect and print all results
drop(result_sender); // Close result channel
for result in result_receiver {
println!("Final result: {:?}", result);
}
}
#[tokio::main]
async fn main() {
// Example repositories
let repos = vec![
"repo1".to_string(),
"repo2".to_string(),
"repo3".to_string(),
];
// Run the pipeline
run_pipeline(repos);
}In Tokio, task::block_in_place allows you to run blocking code within an async runtime without disrupting the runtime’s scheduler, shifting the execution to a separate thread, to avoid interfering with the runtime’s worker threads