Building Async I/O in Rust: How Futures, Wakers, and Thread Pools Work Together
Asynchronous I/O in Rust is a powerful technique that allows programs to perform non-blocking operations, enabling efficient multitasking and improved performance, especially in I/O-bound applications. By using the async
and await
keywords, Rust can handle tasks like reading from a file or sending data over a network without waiting for these operations to complete. This approach helps to prevent a single task from blocking the entire program, allowing other tasks to proceed concurrently.
OS Scheduling
OS Scheduling determines which process or thread runs at any given moment, ensuring efficient CPU utilization. In a blocking I/O model, a process waiting for I/O would cause the CPU to be idle. Modern operating systems solve this by switching to other tasks while I/O is pending, allowing better resource utilization. This approach relies on techniques like polling or I/O multiplexing, where the OS checks for I/O readiness without blocking a process.
Context switching happens when the OS saves the state of a running process and switches to another. When the I/O completes, the OS can switch back, ensuring tasks can continue without wasting CPU time.
Interrupts are signals sent to the CPU by hardware or software that demand immediate attention. For example, when a network packet arrives, a hardware interrupt tells the OS to process it, allowing asynchronous tasks to continue without blocking.
The lifecycle of an I/O bound process typically looks like this:
The process begins by executing on the CPU. After a certain amount of time (the process's time slice), the CPU preempts the process and invokes the OS scheduler, moving the process to the Ready Queue. This ensures the CPU remains available for other tasks, enabling multitasking.
When the process is scheduled again, it resumes execution. During this execution, the process issues an I/O request, such as reading data from a disk or communicating with a network. The process then transitions to the Waiting Queue while it waits for the I/O operation to complete.
While the process is in the Waiting state, the I/O device handles the request. Once the I/O operation is completed, the device sends an interrupt to the CPU, signaling that the process can continue.
The scheduler moves the process back to the Ready Queue after the I/O operation is completed. Once the process is scheduled by the OS again, it resumes execution on the CPU and continues with its next tasks.
Before delving into asynchronous programming in Rust, it's helpful to grasp OS scheduling because it shares several key concepts with async systems. In an operating system, scheduling decides which processes get CPU time and when. When a process is blocked (e.g., waiting for I/O), the OS can switch to another task, preventing CPU wastage. This involves context switching, where the state of a process is saved, and the system switches to another.
Async Rust operates similarly: when an async task (or "future") is waiting on I/O, the async runtime pauses it, allowing other tasks to progress. Like an OS, Rust’s async runtime "schedules" tasks, switches contexts, and uses wakers (like interrupts) to resume tasks when they’re ready. Understanding how OS scheduling works makes it easier to comprehend how async runtimes manage tasks efficiently, ensuring no CPU time is wasted while waiting for I/O operations to complete.
Event Loop in Javascript
JavaScript, being single-threaded, uses an event loop to handle asynchronous tasks. Synchronous operations are handled by the call stack, while asynchronous tasks (e.g., I/O events or timers) are placed in the task queue when complete. The event loop monitors both the stack and queue, pushing tasks from the queue onto the stack only when the stack is empty. This ensures JavaScript can handle asynchronous operations efficiently, without blocking execution.
CallStack: The async function starts executing when called, and the synchronous part of the function runs on the call stack.
TaskQueue: When the async function reaches an
await
statement or an asynchronous task (e.g., an I/O operation or timer), the function yields and the task is placed in the Task Queue. The call stack is then free to handle other tasks.CallStack (again): The Event Loop checks if the call stack is empty. If it is, the callback from the task queue is pushed onto the stack for execution.
Completed: The callback runs to completion, resolving the async function or handling any errors. After this, the task is fully complete.
JavaScript's event loop and an OS scheduler share similar roles in managing tasks efficiently, particularly for I/O-bound operations. Both systems ensure that while one task is waiting on an I/O operation, other tasks can continue executing, preventing idle time. Just like an OS moves processes between the running, ready, and waiting states, JavaScript's event loop moves async tasks between the call stack, task queue, and executing state. By mimicking the OS’s approach to multitasking and resource management, JavaScript can efficiently handle asynchronous I/O operations, ensuring responsive and non-blocking execution. This design highlights how fundamental OS principles apply at higher levels for effective concurrency.
Asynchronous I/O in Rust
In Rust, asynchronous I/O is powered by scheduling, context switching, and interrupts, just like an OS. The async runtime plays the role of an OS scheduler, managing and polling tasks (futures). When an async task (a future) is not ready, the runtime pauses it. Once the awaited resource is ready, the task is resumed, similar to context switching in an OS.
The Future
trait is key to Rust’s async system. Here’s its simplified structure:
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Components of the Future
Trait
Associated Type
Output
:
This defines the eventual result of the future. It could be aResult<T, E>
for operations that can succeed or fail, like network requests.poll
Method:
The core method that drives a future to completion. It takes:self: Pin<&mut Self>
: Ensures the future is pinned (not moved in memory), crucial for memory safety in async.cx: &mut Context<'_>
: A context that provides access to aWaker
, which signals when the future is ready to continue.
The poll
method returns a Poll
enum:
Poll::Pending
: The future isn’t ready yet; it’ll be polled again later.Poll::Ready(T)
: The future has completed and returns its final value.
How Futures Work in Practice
Executor
An executor is responsible for driving futures to completion. Rust’s futures
crate provides a ThreadPool executor, which schedules tasks on multiple threads.
use futures::executor::ThreadPool;
let pool = ThreadPool::new().unwrap();
let future = async { /* ... */ };
pool.spawn_ok(future);
The ThreadPool executor works by scheduling tasks (futures) across multiple threads. A task can be spawned using the spawn_ok
function:
pub fn spawn_ok<Fut>(&self, future: Fut)
where
Fut: Future<Output = ()> + Send + 'static,
{
self.spawn_obj_ok(FutureObj::new(Box::new(future)))
}
This spawns a new task and sends it to the executor’s queue. The Task
struct responsible for polling the future looks like this:
struct Task {
future: FutureObj<'static, ()>,
exec: ThreadPool,
wake_handle: Arc<WakeHandle>,
}
struct WakeHandle {
mutex: UnparkMutex<Task>,
exec: ThreadPool,
}
The run()
method of the Task
struct drives the task to completion:
fn run(self) {
let Self { mut future, wake_handle, mut exec } = self;
let waker = waker_ref(&wake_handle);
let mut cx = Context::from_waker(&waker);
unsafe {
wake_handle.mutex.start_poll();
loop {
let res = future.poll_unpin(&mut cx);
match res {
Poll::Pending => {}
Poll::Ready(()) => return wake_handle.mutex.complete(),
}
let task = Self { future, wake_handle: wake_handle.clone(), exec };
match wake_handle.mutex.wait(task) {
Ok(()) => return,
Err(task) => {
future = task.future;
exec = task.exec;
}
}
}
}
}
Breakdown of the run()
Method:
Polling the Future:
- The task is polled using
poll_unpin
, which tries to make progress on the future. IfPoll::Pending
is returned, the task will wait. IfPoll::Ready(())
, the task has completed, and resources are cleaned up.
- The task is polled using
Creating a New Task Instance:
- A new instance of the task is created with the same
future
,wake_handle
, andexec
. This new task is ready to be parked until the future can make progress.
- A new instance of the task is created with the same
Waiting for Notification:
- The task waits for an external event (e.g., I/O completion) to wake it up. If woken before being parked, it continues polling immediately. Otherwise, it waits for the next opportunity to make progress.
Example: Simulating Async/Await in Rust with a Thread Pool and an I/O Operation
In this example, we will simulate how Rust's async/await system works with asynchronous I/O operations by building a simple example where a future completes after 5 seconds. We’ll use a thread pool to execute the future, and an additional thread will act like an I/O event loop that completes the operation.
The example will demonstrate the core mechanism of how async I/O works: the event loop sleeps for a while, wakes the future, and the thread pool continues processing the future once it’s woken.
Step 1: Define the Shared State and I/O Operation
First, we define a SharedState
struct, which will hold the waker used to notify the future when it can be polled again. The IOOperation
struct simulates an asynchronous I/O task.
use std::sync::{Arc, RwLock};
use std::task::Waker;
// Shared state between the IO operation and the event loop that will wake it
struct SharedState {
waker: Option<Waker>,
}
// Simulates an asynchronous I/O operation that waits for some external event to complete
struct IOOperation {
shared_state: Arc<RwLock<SharedState>>,
}
Here, we use RwLock
to safely access and modify the shared state across multiple threads. The Waker
is stored in SharedState
so that it can be used later by the simulated event loop to notify the future when the I/O operation is complete.
Step 2: Implement the Future Trait for IOOperation
Next, we implement the Future
trait for IOOperation
. The poll
method is responsible for registering the waker in the shared state. The event loop will use this waker to wake the future, allowing it to be polled again and complete its operation.
use std::task::{Context, Poll};
use std::future::Future;
impl Future for IOOperation {
type Output = String;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.write().unwrap();
if shared_state.waker.is_none() {
// Store the waker so the other thread can wake it later
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
} else {
// Once woken by the separate thread, return ready
Poll::Ready("I/O Operation completed!".to_string())
}
}
}
When the poll
function is first called, it checks if the waker has been set. If not, it stores the waker and returns Poll::Pending
, indicating that the future is not yet ready. When the event loop wakes the future, it returns Poll::Ready
, signalling that the operation has completed.
Step 3: Simulate an I/O Event Loop
We now simulate an I/O event loop that waits for 5 seconds before waking the future. In real-world scenarios, this would represent an actual I/O task such as a network request or file read operation being completed.
use std::thread;
use std::time::Duration;
// This function simulates an I/O event loop that "completes" the I/O operation after a delay
fn simulate_io_event_loop(shared_state: Arc<RwLock<SharedState>>) {
let io_event_loop_thread = thread::spawn(move || {
// Simulate the time it takes for an I/O operation to complete (e.g., reading from disk or a network)
thread::sleep(Duration::from_secs(5));
// Once the "I/O operation" is complete, wake the future to allow it to finish
let shared_state = shared_state.read().unwrap();
if let Some(waker) = &shared_state.waker {
waker.wake_by_ref(); // Wake the I/O operation to allow it to complete
}
});
io_event_loop_thread
.join()
.expect("The thread did not execute correctly");
}
The function simulate_io_event_loop
spawns a new thread to simulate an asynchronous I/O event that completes after 5 seconds. Once the "I/O operation" is complete, it wakes the future by calling waker.wake_by_ref()
.
Step 4: Using the Thread Pool to Run the Future
Next, we create the main function, which sets up the thread pool, creates the IOOperation
future, and schedules the future on the thread pool. We also spawn the simulated I/O event loop to wake the future after a delay.
use futures::executor::ThreadPool;
fn main() {
// Create a thread pool for executing futures
let pool = ThreadPool::new().expect("Failed to create thread pool");
let shared_state = Arc::new(RwLock::new(SharedState { waker: None }));
// Create our custom future
let io_operation = IOOperation {
shared_state: shared_state.clone(),
};
// Schedule the future on the thread pool
pool.spawn_ok(async {
let result = io_operation.await;
println!("Result: {}", result);
});
// Simulate the I/O event loop that will "complete" the I/O operation after a delay
simulate_io_event_loop(shared_state);
}
ThreadPool Initialization:
The thread pool is initialized using
ThreadPool::new()
, and it will be responsible for managing and executing our asynchronous tasks.Shared State Creation:
The
shared_state
is created as anArc<RwLock>
, which will be shared between the I/O operation and the event loop.IOOperation Scheduling:
We schedule the
IOOperation
future to run on the thread pool usingpool.spawn_ok()
. The future will wait until the I/O event loop wakes it up, at which point it will complete and print the result.Simulating the I/O Event Loop:
The
simulate_io_event_loop()
function is called to simulate a 5-second delay and then wake the future.
Final Output
When the program is executed, the following happens:
The future is initially polled and registers its waker.
The event loop sleeps for 5 seconds.
After 5 seconds, the event loop wakes the future.
The thread pool continues to process the future, and the future completes with the message "I/O Operation completed!
Compiling async_rust v0.1.0 (/Users/faisal/rust_articles/async_rust)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.30s
Running `target/debug/async_rust`
Result: I/O Operation completed!
Summary
In this article, we explored the fundamentals of asynchronous I/O in Rust and how it parallels with OS scheduling principles. We demonstrated how Rust’s async/await system works by simulating an asynchronous I/O operation using a custom future. The future is managed by a thread pool, and a separate thread acts as an I/O event loop, waking the future after a simulated delay.
By breaking down how futures, wakers, and executors work together, we gained insight into the core mechanics of Rust's async runtime, similar to how operating systems schedule and manage tasks efficiently without blocking. Our example walked through the process of scheduling a future in a thread pool, simulating an external event to wake it, and then processing the result.
In the next article, we will take this a step further by using an event loop like Mio to build an asynchronous HTTP client, offering more practical insight into working with real-world async systems in Rust.