Building Async I/O in Rust: How Futures, Wakers, and Thread Pools Work Together

11 min read

Asynchronous I/O in Rust is a powerful technique that allows programs to perform non-blocking operations, enabling efficient multitasking and improved performance, especially in I/O-bound applications. By using the async and await keywords, Rust can handle tasks like reading from a file or sending data over a network without waiting for these operations to complete. This approach helps to prevent a single task from blocking the entire program, allowing other tasks to proceed concurrently.


OS Scheduling

OS Scheduling determines which process or thread runs at any given moment, ensuring efficient CPU utilization. In a blocking I/O model, a process waiting for I/O would cause the CPU to be idle. Modern operating systems solve this by switching to other tasks while I/O is pending, allowing better resource utilization. This approach relies on techniques like polling or I/O multiplexing, where the OS checks for I/O readiness without blocking a process.

Context switching happens when the OS saves the state of a running process and switches to another. When the I/O completes, the OS can switch back, ensuring tasks can continue without wasting CPU time.

Interrupts are signals sent to the CPU by hardware or software that demand immediate attention. For example, when a network packet arrives, a hardware interrupt tells the OS to process it, allowing asynchronous tasks to continue without blocking.

The lifecycle of an I/O bound process typically looks like this:

  1. The process begins by executing on the CPU. After a certain amount of time (the process's time slice), the CPU preempts the process and invokes the OS scheduler, moving the process to the Ready Queue. This ensures the CPU remains available for other tasks, enabling multitasking.

  2. When the process is scheduled again, it resumes execution. During this execution, the process issues an I/O request, such as reading data from a disk or communicating with a network. The process then transitions to the Waiting Queue while it waits for the I/O operation to complete.

  3. While the process is in the Waiting state, the I/O device handles the request. Once the I/O operation is completed, the device sends an interrupt to the CPU, signaling that the process can continue.

  4. The scheduler moves the process back to the Ready Queue after the I/O operation is completed. Once the process is scheduled by the OS again, it resumes execution on the CPU and continues with its next tasks.

Before delving into asynchronous programming in Rust, it's helpful to grasp OS scheduling because it shares several key concepts with async systems. In an operating system, scheduling decides which processes get CPU time and when. When a process is blocked (e.g., waiting for I/O), the OS can switch to another task, preventing CPU wastage. This involves context switching, where the state of a process is saved, and the system switches to another.

Async Rust operates similarly: when an async task (or "future") is waiting on I/O, the async runtime pauses it, allowing other tasks to progress. Like an OS, Rust’s async runtime "schedules" tasks, switches contexts, and uses wakers (like interrupts) to resume tasks when they’re ready. Understanding how OS scheduling works makes it easier to comprehend how async runtimes manage tasks efficiently, ensuring no CPU time is wasted while waiting for I/O operations to complete.

Event Loop in Javascript

JavaScript, being single-threaded, uses an event loop to handle asynchronous tasks. Synchronous operations are handled by the call stack, while asynchronous tasks (e.g., I/O events or timers) are placed in the task queue when complete. The event loop monitors both the stack and queue, pushing tasks from the queue onto the stack only when the stack is empty. This ensures JavaScript can handle asynchronous operations efficiently, without blocking execution.

  1. CallStack: The async function starts executing when called, and the synchronous part of the function runs on the call stack.

  2. TaskQueue: When the async function reaches an await statement or an asynchronous task (e.g., an I/O operation or timer), the function yields and the task is placed in the Task Queue. The call stack is then free to handle other tasks.

  3. CallStack (again): The Event Loop checks if the call stack is empty. If it is, the callback from the task queue is pushed onto the stack for execution.

  4. Completed: The callback runs to completion, resolving the async function or handling any errors. After this, the task is fully complete.

JavaScript's event loop and an OS scheduler share similar roles in managing tasks efficiently, particularly for I/O-bound operations. Both systems ensure that while one task is waiting on an I/O operation, other tasks can continue executing, preventing idle time. Just like an OS moves processes between the running, ready, and waiting states, JavaScript's event loop moves async tasks between the call stack, task queue, and executing state. By mimicking the OS’s approach to multitasking and resource management, JavaScript can efficiently handle asynchronous I/O operations, ensuring responsive and non-blocking execution. This design highlights how fundamental OS principles apply at higher levels for effective concurrency.


Asynchronous I/O in Rust

In Rust, asynchronous I/O is powered by scheduling, context switching, and interrupts, just like an OS. The async runtime plays the role of an OS scheduler, managing and polling tasks (futures). When an async task (a future) is not ready, the runtime pauses it. Once the awaited resource is ready, the task is resumed, similar to context switching in an OS.

The Future trait is key to Rust’s async system. Here’s its simplified structure:

use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Components of the Future Trait

  1. Associated Type Output:
    This defines the eventual result of the future. It could be a Result<T, E> for operations that can succeed or fail, like network requests.

  2. poll Method:
    The core method that drives a future to completion. It takes:

    • self: Pin<&mut Self>: Ensures the future is pinned (not moved in memory), crucial for memory safety in async.

    • cx: &mut Context<'_>: A context that provides access to a Waker, which signals when the future is ready to continue.

The poll method returns a Poll enum:

  • Poll::Pending: The future isn’t ready yet; it’ll be polled again later.

  • Poll::Ready(T): The future has completed and returns its final value.
    How Futures Work in Practice

Executor

An executor is responsible for driving futures to completion. Rust’s futures crate provides a ThreadPool executor, which schedules tasks on multiple threads.

use futures::executor::ThreadPool;

let pool = ThreadPool::new().unwrap();
let future = async { /* ... */ };
pool.spawn_ok(future);

The ThreadPool executor works by scheduling tasks (futures) across multiple threads. A task can be spawned using the spawn_ok function:

pub fn spawn_ok<Fut>(&self, future: Fut)
where
   Fut: Future<Output = ()> + Send + 'static,
{
   self.spawn_obj_ok(FutureObj::new(Box::new(future)))
}

This spawns a new task and sends it to the executor’s queue. The Task struct responsible for polling the future looks like this:

struct Task {
    future: FutureObj<'static, ()>,
    exec: ThreadPool,
    wake_handle: Arc<WakeHandle>,
}

struct WakeHandle {
    mutex: UnparkMutex<Task>,
    exec: ThreadPool,
}

The run() method of the Task struct drives the task to completion:

fn run(self) {
    let Self { mut future, wake_handle, mut exec } = self;
    let waker = waker_ref(&wake_handle);
    let mut cx = Context::from_waker(&waker);

    unsafe {
        wake_handle.mutex.start_poll();

        loop {
            let res = future.poll_unpin(&mut cx);
            match res {
                Poll::Pending => {}
                Poll::Ready(()) => return wake_handle.mutex.complete(),
            }
            let task = Self { future, wake_handle: wake_handle.clone(), exec };
            match wake_handle.mutex.wait(task) {
                Ok(()) => return,
                Err(task) => {
                    future = task.future;
                    exec = task.exec;
                }
            }
        }
    }
}

Breakdown of the run() Method:

  1. Polling the Future:

    • The task is polled using poll_unpin, which tries to make progress on the future. If Poll::Pending is returned, the task will wait. If Poll::Ready(()), the task has completed, and resources are cleaned up.
  2. Creating a New Task Instance:

    • A new instance of the task is created with the same future, wake_handle, and exec. This new task is ready to be parked until the future can make progress.
  3. Waiting for Notification:

    • The task waits for an external event (e.g., I/O completion) to wake it up. If woken before being parked, it continues polling immediately. Otherwise, it waits for the next opportunity to make progress.

Example: Simulating Async/Await in Rust with a Thread Pool and an I/O Operation

In this example, we will simulate how Rust's async/await system works with asynchronous I/O operations by building a simple example where a future completes after 5 seconds. We’ll use a thread pool to execute the future, and an additional thread will act like an I/O event loop that completes the operation.

The example will demonstrate the core mechanism of how async I/O works: the event loop sleeps for a while, wakes the future, and the thread pool continues processing the future once it’s woken.

Step 1: Define the Shared State and I/O Operation

First, we define a SharedState struct, which will hold the waker used to notify the future when it can be polled again. The IOOperation struct simulates an asynchronous I/O task.

use std::sync::{Arc, RwLock};
use std::task::Waker;

// Shared state between the IO operation and the event loop that will wake it
struct SharedState {
    waker: Option<Waker>,
}

// Simulates an asynchronous I/O operation that waits for some external event to complete
struct IOOperation {
    shared_state: Arc<RwLock<SharedState>>,
}

Here, we use RwLock to safely access and modify the shared state across multiple threads. The Waker is stored in SharedState so that it can be used later by the simulated event loop to notify the future when the I/O operation is complete.

Step 2: Implement the Future Trait for IOOperation

Next, we implement the Future trait for IOOperation. The poll method is responsible for registering the waker in the shared state. The event loop will use this waker to wake the future, allowing it to be polled again and complete its operation.

use std::task::{Context, Poll};
use std::future::Future;

impl Future for IOOperation {
    type Output = String;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self.shared_state.write().unwrap();

        if shared_state.waker.is_none() {
            // Store the waker so the other thread can wake it later
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        } else {
            // Once woken by the separate thread, return ready
            Poll::Ready("I/O Operation completed!".to_string())
        }
    }
}

When the poll function is first called, it checks if the waker has been set. If not, it stores the waker and returns Poll::Pending, indicating that the future is not yet ready. When the event loop wakes the future, it returns Poll::Ready, signalling that the operation has completed.

Step 3: Simulate an I/O Event Loop

We now simulate an I/O event loop that waits for 5 seconds before waking the future. In real-world scenarios, this would represent an actual I/O task such as a network request or file read operation being completed.

use std::thread;
use std::time::Duration;

// This function simulates an I/O event loop that "completes" the I/O operation after a delay
fn simulate_io_event_loop(shared_state: Arc<RwLock<SharedState>>) {
    let io_event_loop_thread = thread::spawn(move || {
        // Simulate the time it takes for an I/O operation to complete (e.g., reading from disk or a network)
        thread::sleep(Duration::from_secs(5));

        // Once the "I/O operation" is complete, wake the future to allow it to finish
        let shared_state = shared_state.read().unwrap();
        if let Some(waker) = &shared_state.waker {
            waker.wake_by_ref(); // Wake the I/O operation to allow it to complete
        }
    });

    io_event_loop_thread
        .join()
        .expect("The thread did not execute correctly");
}

The function simulate_io_event_loop spawns a new thread to simulate an asynchronous I/O event that completes after 5 seconds. Once the "I/O operation" is complete, it wakes the future by calling waker.wake_by_ref().

Step 4: Using the Thread Pool to Run the Future

Next, we create the main function, which sets up the thread pool, creates the IOOperation future, and schedules the future on the thread pool. We also spawn the simulated I/O event loop to wake the future after a delay.

use futures::executor::ThreadPool;

fn main() {
    // Create a thread pool for executing futures
    let pool = ThreadPool::new().expect("Failed to create thread pool");

    let shared_state = Arc::new(RwLock::new(SharedState { waker: None }));
    // Create our custom future
    let io_operation = IOOperation {
        shared_state: shared_state.clone(),
    };

    // Schedule the future on the thread pool
    pool.spawn_ok(async {
        let result = io_operation.await;
        println!("Result: {}", result);
    });

    // Simulate the I/O event loop that will "complete" the I/O operation after a delay
    simulate_io_event_loop(shared_state);
}
  1. ThreadPool Initialization:

    The thread pool is initialized using ThreadPool::new(), and it will be responsible for managing and executing our asynchronous tasks.

  2. Shared State Creation:

    The shared_state is created as an Arc<RwLock>, which will be shared between the I/O operation and the event loop.

  3. IOOperation Scheduling:

    We schedule the IOOperation future to run on the thread pool using pool.spawn_ok(). The future will wait until the I/O event loop wakes it up, at which point it will complete and print the result.

  4. Simulating the I/O Event Loop:

    The simulate_io_event_loop() function is called to simulate a 5-second delay and then wake the future.

Final Output

When the program is executed, the following happens:

  1. The future is initially polled and registers its waker.

  2. The event loop sleeps for 5 seconds.

  3. After 5 seconds, the event loop wakes the future.

  4. The thread pool continues to process the future, and the future completes with the message "I/O Operation completed!

Compiling async_rust v0.1.0 (/Users/faisal/rust_articles/async_rust)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.30s
Running `target/debug/async_rust`
Result: I/O Operation completed!

Summary

In this article, we explored the fundamentals of asynchronous I/O in Rust and how it parallels with OS scheduling principles. We demonstrated how Rust’s async/await system works by simulating an asynchronous I/O operation using a custom future. The future is managed by a thread pool, and a separate thread acts as an I/O event loop, waking the future after a simulated delay.

By breaking down how futures, wakers, and executors work together, we gained insight into the core mechanics of Rust's async runtime, similar to how operating systems schedule and manage tasks efficiently without blocking. Our example walked through the process of scheduling a future in a thread pool, simulating an external event to wake it, and then processing the result.

In the next article, we will take this a step further by using an event loop like Mio to build an asynchronous HTTP client, offering more practical insight into working with real-world async systems in Rust.