Rust 中的 Pin UnPin Async Await 实现机制下

2023年 10月 10日 62.0k 0

原文地址

为了保证概念的严谨性,翻译时保留了英文原文。

由于文章内容超过编辑器最大长度,所以拆成了上下部分

Rust 中的 Pin UnPin Async Await 实现机制上

3. Implementation 实现

Now that we understand how cooperative multitasking based on futures and async/await works in Rust, it’s time to add support for it to our kernel. Since the Future trait is part of the core library and async/await is a feature of the language itself, there is nothing special we need to do to use it in our #![no_std] kernel. The only requirement is that we use at least nightly 2020-03-25 of Rust because async/await was not no_std compatible before.

现在我们了解了 Rust 中基于 Future 和 async/await 的协作多任务如何工作,是时候在我们的内核中添加对其的支持了。由于 Future 特征是 core 库的一部分,并且 async/await 是语言本身的一个功能,因此我们不需要做任何特殊的事情就可以在 #![no_std] 内核中使用。唯一的要求是我们至少使用 Rust 的 nightly 2020-03-25 ,因为 async/await 之前与 no_std 不兼容。

With a recent-enough nightly, we can start using async/await in our main.rs:

使用最近的 Rust nightly 版本,我们可以开始在 main.rs 中使用 async/await :

// in src/main.rs

async fn async_number() -> u32 {
    42
}

async fn example_task() {
    let number = async_number().await;
    println!("async number: {}", number);
}

The async_number function is an async fn, so the compiler transforms it into a state machine that implements Future. Since the function only returns 42, the resulting future will directly return Poll::Ready(42) on the first poll call. Like async_number, the example_task function is also an async fn. It awaits the number returned by async_number and then prints it using the println macro.

async_number 函数是一个 async fn ,因此编译器将其转换为实现 Future 的状态机。由于该函数仅返回 42 ,因此最终的 future 将在第一次 poll 调用时直接返回 Poll::Ready(42) 。与 async_number 一样, example_task 函数也是 async fn 。它等待 async_number 返回的数字,然后使用 println 宏打印它。

To run the future returned by example_task, we need to call poll on it until it signals its completion by returning Poll::Ready. To do this, we need to create a simple executor type.

要运行 example_task 返回的 Future,我们需要对其调用 poll ,直到它通过返回 Poll::Ready 发出完成信号。为此,我们需要创建一个简单的执行器类型。

3.1 Task 任务

Before we start the executor implementation, we create a new task module with a Task type:

在开始执行器实现之前,我们创建一个具有 Task 类型的新 task 模块:

// in src/lib.rs

pub mod task;
// in src/task/mod.rs

use core::{future::Future, pin::Pin};
use alloc::boxed::Box;

pub struct Task {
    future: Pin,
}

The Task struct is a newtype wrapper around a pinned, heap-allocated, and dynamically dispatched future with the empty type () as output. Let’s go through it in detail:

Task 结构是一个新类型包装器,封装了 固定的 、堆分配的和动态分派的 future,以空类型 () 作为输出。让我们详细了解一下:

  • We require that the future associated with a task returns (). This means that tasks don’t return any result, they are just executed for their side effects. For example, the example_task function we defined above has no return value, but it prints something to the screen as a side effect.

  • 我们要求与任务关联的 future 返回 () 。这意味着任务不会返回任何结果,它们只是因其副作用而被执行。例如,我们上面定义的 example_task 函数没有返回值,但它会在屏幕上打印一些内容作为副作用。

  • The dyn keyword indicates that we store a trait object in the Box. This means that the methods on the future are dynamically dispatched, allowing different types of futures to be stored in the Task type. This is important because each async fn has its own type and we want to be able to create multiple different tasks.

  • dyn 关键字表示我们在 Box 中存储一个特征对象。这意味着 Future 的方法是动态派发的,允许不同类型的 Future 存储在 Task 类型中。这很重要,因为每个 async fn 都有自己的类型,我们希望能够创建多个不同的任务。

  • As we learned in the section about pinning, the Pin type ensures that a value cannot be moved in memory by placing it on the heap and preventing the creation of &mut references to it. This is important because futures generated by async/await might be self-referential, i.e., contain pointers to themselves that would be invalidated when the future is moved.

  • 正如我们在有关固定(Pin)的部分中了解到的, Pin 类型通过将值放置在堆上并防止创建对其的 &mut 引用来确保值无法在内存中移动。这很重要,因为由 async/await 生成的 Future 可能是自引用的,即包含指向自身的指针,当 future 移动时,这些指针将失效。

To allow the creation of new Task structs from futures, we create a new function:

为了允许从 Future 创建新的 Task 结构,我们创建一个 new 函数:

// in src/task/mod.rs

impl Task {
    pub fn new(future: impl Future + 'static) -> Task {
        Task {
            future: Box::pin(future),
        }
    }
}

The function takes an arbitrary future with an output type of () and pins it in memory through the Box::pin function. Then it wraps the boxed future in the Task struct and returns it. The 'static lifetime is required here because the returned Task can live for an arbitrary time, so the future needs to be valid for that time too.

该函数采用输出类型为 () 的任意 Future,并通过 Box::pin 函数将其固定在内存中。然后它将装箱的 Future 包装在 Task 结构中并返回它。这里需要 'static 生命周期,因为返回的 Task 可以存活任意时间(即Task 可以长时间运行),因此 Future 也需要在该时间内有效。

We also add a poll method to allow the executor to poll the stored future:

我们还添加了一个 poll 方法来允许执行器轮询存储的 Future:

// in src/task/mod.rs

use core::task::{Context, Poll};

impl Task {
    fn poll(&mut self, context: &mut Context) -> Poll {
        self.future.as_mut().poll(context)
    }
}

Since the poll method of the Future trait expects to be called on a Pin type, we use the Pin::as_mut method to convert the self.future field of type Pin first. Then we call poll on the converted self.future field and return the result. Since the Task::poll method should only be called by the executor that we’ll create in a moment, we keep the function private to the task module.

由于 Future 特征的 poll 方法期望在 Pin 类型上调用,因此我们使用 Pin::as_mut 方法来转换 类型为 Pin 的字段。然后我们对转换后的 self.future 字段调用 poll 并返回结果。由于 Task::poll 方法只能由我们稍后创建的执行器调用,因此我们将该函数保留为 task 模块的私有函数。

3.2 Simple Executor 简单执行器

Since executors can be quite complex, we deliberately start by creating a very basic executor before implementing a more featureful executor later. For this, we first create a new task::simple_executor submodule:

由于执行器可能非常复杂,因此我们特意从创建一个非常基本的执行器开始,然后再实现功能更强大的执行器。为此,我们首先创建一个新的 task::simple_executor 子模块:

// in src/task/mod.rs

pub mod simple_executor;
// in src/task/simple_executor.rs

use super::Task;
use alloc::collections::VecDeque;

pub struct SimpleExecutor {
    task_queue: VecDeque,
}

impl SimpleExecutor {
    pub fn new() -> SimpleExecutor {
        SimpleExecutor {
            task_queue: VecDeque::new(),
        }
    }

    pub fn spawn(&mut self, task: Task) {
        self.task_queue.push_back(task)
    }
}

The struct contains a single task_queue field of type VecDeque, which is basically a vector that allows for push and pop operations on both ends. The idea behind using this type is that we insert new tasks through the spawn method at the end and pop the next task for execution from the front. This way, we get a simple FIFO queue (“first in, first out”).

该结构体包含一个 VecDeque 类型的 task_queue 字段,它基本上是一个允许两端进行入栈和出栈操作的 Vec 。使用这种类型背后的想法是,我们通过末尾的 spawn 方法 push 新任务,并从前面 pop 下一个任务来执行。这样,我们就得到了一个简单的 FIFO 队列(“先进先出”)。

3.2.1 Dummy Waker

In order to call the poll method, we need to create a Context type, which wraps a Waker type. To start simple, we will first create a dummy waker that does nothing. For this, we create a RawWaker instance, which defines the implementation of the different Waker methods, and then use the Waker::from_raw function to turn it into a Waker:

为了调用 poll 方法,我们需要创建一个 Context 类型,它包装一个 Waker 类型。首先简单地开始,我们将首先创建一个不执行任何操作的虚拟唤醒器。为此,我们创建一个 RawWaker 实例,它定义了不同 Waker 方法的实现,然后使用 Waker::from_raw 函数将其转换为 Waker

// in src/task/simple_executor.rs

use core::task::{Waker, RawWaker};

fn dummy_raw_waker() -> RawWaker {
    todo!();
}

fn dummy_waker() -> Waker {
    unsafe { Waker::from_raw(dummy_raw_waker()) }
}

The from_raw function is unsafe because undefined behavior can occur if the programmer does not uphold the documented requirements of RawWaker. Before we look at the implementation of the dummy_raw_waker function, we first try to understand how the RawWaker type works.

from_raw 函数是 unsafe 的,因为如果程序员不遵守 RawWaker 记录的要求,则可能会发生未定义的行为。在查看 dummy_raw_waker 函数的实现之前,我们首先尝试了解 RawWaker 类型的工作原理。

3.2.1.1 RawWaker

The RawWaker type requires the programmer to explicitly define a virtual method table (vtable) that specifies the functions that should be called when the RawWaker is cloned, woken, or dropped. The layout of this vtable is defined by the RawWakerVTable type. Each function receives a *const () argument, which is a type-erased pointer to some value. The reason for using a *const () pointer instead of a proper reference is that the RawWaker type should be non-generic but still support arbitrary types. The pointer is provided by putting it into the data argument of RawWaker::new, which just initializes a RawWaker. The Waker then uses this RawWaker to call the vtable functions with data.

RawWaker 类型要求程序员显式定义一个虚拟方法表 (vtable),该表指定在克隆、唤醒或删除 RawWaker 时应调用的函数。该虚函数表的布局由 RawWakerVTable 类型定义。每个函数接收一个 *const () 参数,它是一个指向某个值的类型擦除指针。使用 *const () 指针而不是正确的引用的原因是 RawWaker 类型应该是非泛型的,但仍支持任意类型。通过将指针放入 RawWaker::newdata 参数中来提供指针,该参数仅初始化 RawWaker 。然后 Waker 使用此 RawWaker 通过 data 调用 vtable 函数。

Typically, the RawWaker is created for some heap-allocated struct that is wrapped into the Box or Arc type. For such types, methods like Box::into_raw can be used to convert the Box to a *const T pointer. This pointer can then be cast to an anonymous *const () pointer and passed to RawWaker::new. Since each vtable function receives the same *const () as an argument, the functions can safely cast the pointer back to a Box or a &T to operate on it. As you can imagine, this process is highly dangerous and can easily lead to undefined behavior on mistakes. For this reason, manually creating a RawWaker is not recommended unless necessary.

通常, RawWaker 是为包装到 BoxArc 类型中的某些堆分配结构创建的。对于此类类型,可以使用 Box::into_raw 等方法将 Box 转换为 *const T 指针。然后可以将该指针转换为匿名 *const () 指针并传递给 RawWaker::new 。由于每个 vtable 函数都接收相同的 *const () 作为参数,因此函数可以安全地将指针强制转换回 Box&T 来对其进行操作。可以想象,这个过程是非常危险的,很容易导致错误的未定义行为。因此,除非必要,否则不建议手动创建 RawWaker

3.2.1.2 A Dummy RawWaker

While manually creating a RawWaker is not recommended, there is currently no other way to create a dummy Waker that does nothing. Fortunately, the fact that we want to do nothing makes it relatively safe to implement the dummy_raw_waker function:

虽然不建议手动创建 RawWaker ,但目前没有其他方法可以创建不执行任何操作的虚拟 Waker 。幸运的是,我们不想做任何事情,这使得实现 dummy_raw_waker 函数相对安全:

// in src/task/simple_executor.rs

use core::task::RawWakerVTable;

fn dummy_raw_waker() -> RawWaker {
    fn no_op(_: *const ()) {}
    fn clone(_: *const ()) -> RawWaker {
        dummy_raw_waker()
    }

    let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
    RawWaker::new(0 as *const (), vtable)
}

First, we define two inner functions named no_op and clone. The no_op function takes a *const () pointer and does nothing. The clone function also takes a *const () pointer and returns a new RawWaker by calling dummy_raw_waker again. We use these two functions to create a minimal RawWakerVTable: The clone function is used for the cloning operations, and the no_op function is used for all other operations. Since the RawWaker does nothing, it does not matter that we return a new RawWaker from clone instead of cloning it.

首先,我们定义两个名为 no_opclone 的内部函数。 no_op 函数参数是 *const () 指针,但不执行任何操作。 clone 函数还接受一个 *const () 指针,并通过再次调用 dummy_raw_waker 返回一个新的 RawWaker 。我们使用这两个函数创建一个最小的 RawWakerVTableclone 函数用于克隆操作, no_op 函数用于所有其他操作。由于 RawWaker 不执行任何操作,因此我们从 clone 返回新的 RawWaker 而不是克隆它并不重要。

After creating the vtable, we use the RawWaker::new function to create the RawWaker. The passed *const () does not matter since none of the vtable functions use it. For this reason, we simply pass a null pointer.

创建 vtable 后,我们使用 RawWaker::new 函数创建 RawWaker 。传递的 *const () 并不重要,因为没有 vtable 函数使用它。因此,我们只需传递一个空指针即可。

3.2.3 A run Method

Now we have a way to create a Waker instance, we can use it to implement a run method on our executor. The most simple run method is to repeatedly poll all queued tasks in a loop until all are done. This is not very efficient since it does not utilize the notifications of the Waker type, but it is an easy way to get things running:

现在我们有了创建 Waker 实例的方法,我们可以使用它在执行器上实现 run 方法。最简单的 run 方法是在循环中重复轮询所有排队的任务,直到所有任务完成。这不是很有效,因为它不利用 Waker 类型的通知,但这是一种让事情运行的简单方法:

// in src/task/simple_executor.rs

use core::task::{Context, Poll};

impl SimpleExecutor {
    pub fn run(&mut self) {
        while let Some(mut task) = self.task_queue.pop_front() {
            let waker = dummy_waker();
            let mut context = Context::from_waker(&waker);
            match task.poll(&mut context) {
                Poll::Ready(()) => {} // task done
                Poll::Pending => self.task_queue.push_back(task),
            }
        }
    }
}

The function uses a while let loop to handle all tasks in the task_queue. For each task, it first creates a Context type by wrapping a Waker instance returned by our dummy_waker function. Then it invokes the Task::poll method with this context. If the poll method returns Poll::Ready, the task is finished and we can continue with the next task. If the task is still Poll::Pending, we add it to the back of the queue again so that it will be polled again in a subsequent loop iteration.

该函数使用 while let 循环来处理 task_queue 中的所有任务。对于每个任务,它首先通过包装 dummy_waker 函数返回的 Waker 实例来创建 Context 类型。然后它使用 context 调用 Task::poll 方法。如果 poll 方法返回 Poll::Ready ,则任务完成,我们可以继续下一个任务。如果任务仍然是 Poll::Pending ,我们再次将其添加到队列后面,以便在后续循环迭代中再次轮询它。

3.2.4 Trying It 尝试一下

With our SimpleExecutor type, we can now try running the task returned by the example_task function in our main.rs:

使用我们的 SimpleExecutor 类型,我们现在可以尝试运行 main.rs 中的 example_task 函数返回的任务:

// in src/main.rs

use blog_os::task::{Task, simple_executor::SimpleExecutor};

fn kernel_main(boot_info: &'static BootInfo) -> ! {
    // […] initialization routines, including `init_heap`

    let mut executor = SimpleExecutor::new();
    executor.spawn(Task::new(example_task()));
    executor.run();

    // […] test_main, "it did not crash" message, hlt_loop
}


// Below is the example_task function again so that you don't have to scroll up

async fn async_number() -> u32 {
    42
}

async fn example_task() {
    let number = async_number().await;
    println!("async number: {}", number);
}

When we run it, we see that the expected “async number: 42” message is printed to the screen:

当我们运行它时,我们看到预期的“async number:42”消息被打印到屏幕上:

QEMU printing “Hello World”, “async number: 42”, and “It did not crash!”

Let’s summarize the various steps that happen in this example:

让我们总结一下此示例中发生的各个步骤:

  • First, a new instance of our SimpleExecutor type is created with an empty task_queue.

    首先,使用空的 task_queue 创建 SimpleExecutor 类型的新实例。

  • Next, we call the asynchronous example_task function, which returns a future. We wrap this future in the Task type, which moves it to the heap and pins it, and then add the task to the task_queue of the executor through the spawn method.

    接下来,我们调用异步 example_task 函数,它返回一个 future。我们将这个 future 包装在 Task 类型中,将其移动到堆中并将其固定,然后通过 spawn 中> 方法。

  • We then call the run method to start the execution of the single task in the queue. This involves:

    然后我们调用 run 方法来开始执行队列中的单个任务。这涉及:

    • Popping the task from the front of the task_queue.

      task_queue 前面 pop 任务。

    • Creating a RawWaker for the task, converting it to a Waker instance, and then creating a Context instance from it.

      为任务创建一个 RawWaker ,将其转换为 Waker 实例,然后从中创建一个 Context 实例。

    • Calling the poll method on the future of the task, using the Context we just created.

      使用我们刚刚创建的 Context 调用Future任务的 poll 方法。

    • Since the example_task does not wait for anything, it can directly run till its end on the first poll call. This is where the “async number: 42” line is printed.

      由于 example_task 不等待任何内容,因此它可以直接运行到第一次 poll 调用时结束。这是打印“async number: 42”行的地方。

    • Since the example_task directly returns Poll::Ready, it is not added back to the task queue.

      由于 example_task 直接返回 Poll::Ready ,因此不会将其添加回任务队列。

  • The run method returns after the task_queue becomes empty. The execution of our kernel_main function continues and the “It did not crash!” message is printed.

    run 方法在 task_queue 变空后返回。我们的 kernel_main 函数继续执行,并且 消息“It did not crash!”被打印。

3.3 Async Keyboard Input 异步键盘输入

Our simple executor does not utilize the Waker notifications and simply loops over all tasks until they are done. This wasn’t a problem for our example since our example_task can directly run to finish on the first poll call. To see the performance advantages of a proper Waker implementation, we first need to create a task that is truly asynchronous, i.e., a task that will probably return Poll::Pending on the first poll call.

我们的简单执行器不使用 Waker 通知,而是简单地循环所有任务直到完成。对于我们的示例来说这不是问题,因为我们的 example_task 可以直接运行以完成第一个 poll 调用。要了解正确 Waker 实现的性能优势,我们首先需要创建一个真正异步的任务,即可能在第一个 poll 的任务 调用。

We already have some kind of asynchronicity in our system that we can use for this: hardware interrupts. As we learned in the Interrupts post, hardware interrupts can occur at arbitrary points in time, determined by some external device. For example, a hardware timer sends an interrupt to the CPU after some predefined time has elapsed. When the CPU receives an interrupt, it immediately transfers control to the corresponding handler function defined in the interrupt descriptor table (IDT).

我们的系统中已经有了某种可以用于此目的的异步性:硬件中断。正如我们在中断帖子中了解到的,硬件中断可以在任意时间点发生,由某些外部设备决定。例如,硬件定时器在经过某个预定义时间后向 CPU 发送中断。当CPU收到中断时,它立即将控制权转移到中断描述符表(IDT)中定义的相应处理函数。

In the following, we will create an asynchronous task based on the keyboard interrupt. The keyboard interrupt is a good candidate for this because it is both non-deterministic and latency-critical. Non-deterministic means that there is no way to predict when the next key press will occur because it is entirely dependent on the user. Latency-critical means that we want to handle the keyboard input in a timely manner, otherwise the user will feel a lag. To support such a task in an efficient way, it will be essential that the executor has proper support for Waker notifications.

下面我们将创建一个基于键盘中断的异步任务。键盘中断是一个很好的选择,因为它既不确定又对延迟至关重要。不确定性意味着无法预测下一次按键何时发生,因为它完全取决于用户。延迟关键意味着我们要及时处理键盘输入,否则用户会感到延迟。为了以有效的方式支持此类任务,执行器必须对 Waker 通知提供适当的支持。

3.3.1 Scancode Queue 输入队列

Currently, we handle the keyboard input directly in the interrupt handler. This is not a good idea for the long term because interrupt handlers should stay as short as possible as they might interrupt important work. Instead, interrupt handlers should only perform the minimal amount of work necessary (e.g., reading the keyboard scancode) and leave the rest of the work (e.g., interpreting the scancode) to a background task.

目前,我们直接在中断处理程序中处理键盘输入。从长远来看,这不是一个好主意,因为中断处理程序应该尽可能短,因为它们可能会中断重要的工作。相反,中断处理程序应该只执行最少量的必要工作(例如,读取键盘扫描码),并将其余工作(例如,解释扫描码)留给后台任务。

A common pattern for delegating work to a background task is to create some sort of queue. The interrupt handler pushes units of work to the queue, and the background task handles the work in the queue. Applied to our keyboard interrupt, this means that the interrupt handler only reads the scancode from the keyboard, pushes it to the queue, and then returns. The keyboard task sits on the other end of the queue and interprets and handles each scancode that is pushed to it:

将工作委派给后台任务的常见模式是创建某种队列。中断处理程序将工作单元推送到队列,后台任务处理队列中的工作。应用于我们的键盘中断,这意味着中断处理程序仅从键盘读取扫描码,将其推送到队列,然后返回。键盘任务位于队列的另一端,解释并处理推送给它的每个扫描码:

在这里插入图片描述

A simple implementation of that queue could be a mutex-protected VecDeque. However, using mutexes in interrupt handlers is not a good idea since it can easily lead to deadlocks. For example, when the user presses a key while the keyboard task has locked the queue, the interrupt handler tries to acquire the lock again and hangs indefinitely. Another problem with this approach is that VecDeque automatically increases its capacity by performing a new heap allocation when it becomes full. This can lead to deadlocks again because our allocator also uses a mutex internally. Further problems are that heap allocations can fail or take a considerable amount of time when the heap is fragmented.

该队列的一个简单实现可以是受互斥锁保护的 VecDeque 。然而,在中断处理程序中使用互斥体并不是一个好主意,因为它很容易导致死锁。例如,当用户在键盘任务锁定队列时按下某个键时,中断处理程序会尝试再次获取锁定并无限期挂起。此方法的另一个问题是 VecDeque 在变满时通过执行新的堆分配来自动增加其容量。这可能会再次导致死锁,因为我们的分配器也在内部使用互斥体。进一步的问题是,当堆碎片时,堆分配可能会失败或花费大量时间。

To prevent these problems, we need a queue implementation that does not require mutexes or allocations for its push operation. Such queues can be implemented by using lock-free atomic operations for pushing and popping elements. This way, it is possible to create push and pop operations that only require a &self reference and are thus usable without a mutex. To avoid allocations on push, the queue can be backed by a pre-allocated fixed-size buffer. While this makes the queue bounded (i.e., it has a maximum length), it is often possible to define reasonable upper bounds for the queue length in practice, so that this isn’t a big problem.

为了防止这些问题,我们需要一个队列实现,其 push 操作不需要互斥体或分配。此类队列可以通过使用无锁原子操作来推送和弹出元素来实现。这样,就可以创建仅需要 &self 引用的 pushpop 操作,因此无需互斥体即可使用。为了避免 push 上的分配,队列可以由预先分配的固定大小缓冲区支持。虽然这使得队列有界(即它具有最大长度),但在实践中通常可以为队列长度定义合理的上限,因此这不是一个大问题。

3.3.1.1 The crossbeam Crate

Implementing such a queue in a correct and efficient way is very difficult, so I recommend sticking to existing, well-tested implementations. One popular Rust project that implements various mutex-free types for concurrent programming is crossbeam. It provides a type named ArrayQueue that is exactly what we need in this case. And we’re lucky: the type is fully compatible with no_std crates with allocation support.

以正确且有效的方式实现这样的队列非常困难,因此我建议坚持使用现有的、经过充分测试的实现。 crossbeam 是一个流行的 Rust 项目,它为并发编程实现了各种无互斥类型。它提供了一个名为 ArrayQueue 的类型,这正是我们在本例中所需要的。我们很幸运:该类型与具有分配支持的 no_std crate 完全兼容。

To use the type, we need to add a dependency on the crossbeam-queue crate:

要使用该类型,我们需要添加对 crossbeam-queue 包的依赖:

# in Cargo.toml

[dependencies.crossbeam-queue]
version = "0.2.1"
default-features = false
features = ["alloc"]

By default, the crate depends on the standard library. To make it no_std compatible, we need to disable its default features and instead enable the alloc feature. (Note that we could also add a dependency on the main crossbeam crate, which re-exports the crossbeam-queue crate, but this would result in a larger number of dependencies and longer compile times.)

默认情况下,crate 依赖于标准库。为了使其 no_std 兼容,我们需要禁用其默认功能(default-features = false)并启用 alloc 功能。 (请注意,我们还可以添加对主 crossbeam 包的依赖项,它会重新导出 crossbeam-queue 包,但这会导致更多的依赖项和更长的编译时间。 )

3.3.1.2 Queue Implementation 队列实现

Using the ArrayQueue type, we can now create a global scancode queue in a new task::keyboard module:

使用 ArrayQueue 类型,我们现在可以在新的 task::keyboard 模块中创建全局扫描代码队列:

// in src/task/mod.rs

pub mod keyboard;
// in src/task/keyboard.rs

use conquer_once::spin::OnceCell;
use crossbeam_queue::ArrayQueue;

static SCANCODE_QUEUE: OnceCell = OnceCell::uninit();

Since ArrayQueue::new performs a heap allocation, which is not possible at compile time (yet), we can’t initialize the static variable directly. Instead, we use the OnceCell type of the conquer_once crate, which makes it possible to perform a safe one-time initialization of static values. To include the crate, we need to add it as a dependency in our Cargo.toml:

由于 ArrayQueue::new 执行堆分配,这在编译时是不可能的,所以我们不能直接初始化静态变量。相反,我们使用 conquer_once 包的 OnceCell 类型,这使得可以对静态值执行安全的一次性初始化。要包含该包,我们需要将其添加为 Cargo.toml 中的依赖项:

# in Cargo.toml

[dependencies.conquer-once]
version = "0.2.0"
default-features = false

Instead of the OnceCell primitive, we could also use the lazy_static macro here. However, the OnceCell type has the advantage that we can ensure that the initialization does not happen in the interrupt handler, thus preventing the interrupt handler from performing a heap allocation.

我们还可以在这里使用 lazy_static 宏来代替 OnceCell 原语。然而, OnceCell 类型的优点是我们可以确保初始化不会发生在中断处理程序中,从而阻止中断处理程序执行堆分配。

3.3.2 Filling the Queue 填充队列

To fill the scancode queue, we create a new add_scancode function that we will call from the interrupt handler:

为了填充扫描码队列,我们​​创建一个新的 add_scancode 函数,我们将从中断处理程序中调用该函数:

// in src/task/keyboard.rs

use crate::println;

/// Called by the keyboard interrupt handler
///
/// Must not block or allocate.
pub(crate) fn add_scancode(scancode: u8) {
    if let Ok(queue) = SCANCODE_QUEUE.try_get() {
        if let Err(_) = queue.push(scancode) {
            println!("WARNING: scancode queue full; dropping keyboard input");
        }
    } else {
        println!("WARNING: scancode queue uninitialized");
    }
}

We use OnceCell::try_get to get a reference to the initialized queue. If the queue is not initialized yet, we ignore the keyboard scancode and print a warning. It’s important that we don’t try to initialize the queue in this function because it will be called by the interrupt handler, which should not perform heap allocations. Since this function should not be callable from our main.rs, we use the pub(crate) visibility to make it only available to our lib.rs.

我们使用 OnceCell::try_get 来获取对初始化队列的引用。如果队列尚未初始化,我们将忽略键盘扫描码并打印警告。重要的是,我们不要尝试在此函数中初始化队列,因为它将由中断处理程序调用,而中断处理程序不应执行堆分配。由于该函数不应从 main.rs 调用,因此我们使用 pub(crate) 可见性使其仅可用于 lib.rs

The fact that the ArrayQueue::push method requires only a &self reference makes it very simple to call the method on the static queue. The ArrayQueue type performs all the necessary synchronization itself, so we don’t need a mutex wrapper here. In case the queue is full, we print a warning too.

ArrayQueue::push 方法仅需要 &self 引用这一事实使得在静态队列上调用该方法变得非常简单。 ArrayQueue 类型本身执行所有必要的同步,因此我们在这里不需要互斥锁包装器。如果队列已满,我们也会打印一条警告。

To call the add_scancode function on keyboard interrupts, we update our keyboard_interrupt_handler function in the interrupts module:

要在键盘中断时调用 add_scancode 函数,我们更新 interrupts 模块中的 keyboard_interrupt_handler 函数:

// in src/interrupts.rs

extern "x86-interrupt" fn keyboard_interrupt_handler(
    _stack_frame: InterruptStackFrame
) {
    use x86_64::instructions::port::Port;

    let mut port = Port::new(0x60);
    let scancode: u8 = unsafe { port.read() };
    crate::task::keyboard::add_scancode(scancode); // new

    unsafe {
        PICS.lock()
            .notify_end_of_interrupt(InterruptIndex::Keyboard.as_u8());
    }
}

We removed all the keyboard handling code from this function and instead added a call to the add_scancode function. The rest of the function stays the same as before.

我们从此函数中删除了所有键盘处理代码,而是添加了对 add_scancode 函数的调用。其余功能与以前相同。

As expected, keypresses are no longer printed to the screen when we run our project using cargo run now. Instead, we see the warning that the scancode queue is uninitialized for every keystroke.

正如预期的那样,当我们现在使用 cargo run 运行项目时,按键不再打印到屏幕上。相反,我们看到警告:每次击键扫描码队列都未初始化。

3.3.3 Scancode Stream

To initialize the SCANCODE_QUEUE and read the scancodes from the queue in an asynchronous way, we create a new ScancodeStream type:

为了初始化 SCANCODE_QUEUE 并以异步方式从队列中读取扫描码,我们创建一个新的 ScancodeStream 类型:

// in src/task/keyboard.rs

pub struct ScancodeStream {
    _private: (),
}

impl ScancodeStream {
    pub fn new() -> Self {
        SCANCODE_QUEUE.try_init_once(|| ArrayQueue::new(100))
            .expect("ScancodeStream::new should only be called once");
        ScancodeStream { _private: () }
    }
}

The purpose of the _private field is to prevent construction of the struct from outside of the module. This makes the new function the only way to construct the type. In the function, we first try to initialize the SCANCODE_QUEUE static. We panic if it is already initialized to ensure that only a single ScancodeStream instance can be created.

_private 字段的目的是防止从模块外部构造结构。这使得 new 函数成为构造该类型的唯一方法。在函数中,我们首先尝试初始化 SCANCODE_QUEUE 静态。如果它已经初始化,我们会感到恐慌,以确保只能创建一个 ScancodeStream 实例。

To make the scancodes available to asynchronous tasks, the next step is to implement a poll-like method that tries to pop the next scancode off the queue. While this sounds like we should implement the Future trait for our type, this does not quite fit here. The problem is that the Future trait only abstracts over a single asynchronous value and expects that the poll method is not called again after it returns Poll::Ready. Our scancode queue, however, contains multiple asynchronous values, so it is okay to keep polling it.

为了使扫描码可用于异步任务,下一步是实现类似 poll 的方法,尝试从队列中弹出下一个扫描码。虽然这听起来像是我们应该为我们的类型实现 Future 特征,但这不太适合这里。问题在于 Future 特征仅抽象单个异步值,并期望 poll 方法在返回 Poll::Ready 后不会再次调用。然而,我们的扫描码队列包含多个异步值,因此可以继续轮询它。

3.3.3.1The Stream Trait Stream 特征

Since types that yield multiple asynchronous values are common, the futures crate provides a useful abstraction for such types: the Stream trait. The trait is defined like this:

由于产生多个异步值的类型很常见,因此 futures 包为此类类型提供了有用的抽象: Stream 特征。该特征的定义如下:

pub trait Stream {
    type Item;

    fn poll_next(self: Pin, cx: &mut Context)
        -> Poll;
}

This definition is quite similar to the Future trait, with the following differences:

这个定义与 Future 特征非常相似,但有以下区别:

  • The associated type is named Item instead of Output.

关联的类型被命名为 Item 而不是 Output

  • Instead of a poll method that returns Poll, the Stream trait defines a poll_next method that returns a Poll (note the additional Option).

    Stream 特征定义了一个返回 Pollpoll_next 方法,而不是返回 Pollpoll 方法。 (注意附加的 Option )。

There is also a semantic difference: The poll_next can be called repeatedly, until it returns Poll::Ready(None) to signal that the stream is finished. In this regard, the method is similar to the Iterator::next method, which also returns None after the last value.

还有一个语义差异: poll_next 可以重复调用,直到它返回 Poll::Ready(None) 以表示流已完成。在这方面,该方法类似于 Iterator::next 方法,它也在最后一个值之后返回 None

3.3.3.2 Implementing Stream

Let’s implement the Stream trait for our ScancodeStream to provide the values of the SCANCODE_QUEUE in an asynchronous way. For this, we first need to add a dependency on the futures-util crate, which contains the Stream type:

让我们为 ScancodeStream 实现 Stream 特征,以异步方式提供 SCANCODE_QUEUE 的值。为此,我们首先需要添加对 futures-util 包的依赖,其中包含 Stream 类型:

# in Cargo.toml

[dependencies.futures-util]
version = "0.3.4"
default-features = false
features = ["alloc"]

We disable the default features to make the crate no_std compatible and enable the alloc feature to make its allocation-based types available (we will need this later). (Note that we could also add a dependency on the main futures crate, which re-exports the futures-util crate, but this would result in a larger number of dependencies and longer compile times.)

我们禁用默认功能以使包 no_std 兼容,并启用 alloc 功能以使其基于分配的类型可用(稍后我们将需要它)。 (请注意,我们还可以添加对主 futures 包的依赖项,它会重新导出 futures-util 包,但这会导致更多的依赖项和更长的编译时间。 )

Now we can import and implement the Stream trait:

现在我们可以导入并实现 Stream 特征:

// in src/task/keyboard.rs

use core::{pin::Pin, task::{Poll, Context}};
use futures_util::stream::Stream;

impl Stream for ScancodeStream {
    type Item = u8;

    fn poll_next(self: Pin, cx: &mut Context) -> Poll {
        let queue = SCANCODE_QUEUE.try_get().expect("not initialized");
        match queue.pop() {
            Ok(scancode) => Poll::Ready(Some(scancode)),
            Err(crossbeam_queue::PopError) => Poll::Pending,
        }
    }
}

We first use the OnceCell::try_get method to get a reference to the initialized scancode queue. This should never fail since we initialize the queue in the new function, so we can safely use the expect method to panic if it’s not initialized. Next, we use the ArrayQueue::pop method to try to get the next element from the queue. If it succeeds, we return the scancode wrapped in Poll::Ready(Some(…)). If it fails, it means that the queue is empty. In that case, we return Poll::Pending.

我们首先使用 OnceCell::try_get 方法来获取对初始化的扫描码队列的引用。这应该永远不会失败,因为我们在 new 函数中初始化了队列,因此我们可以安全地使用 expect 方法来恐慌如果它没有初始化。接下来,我们使用 ArrayQueue::pop 方法尝试从队列中获取下一个元素。如果成功,我们返回包裹在 Poll::Ready(Some(…)) 中的扫描码。如果失败,则说明队列为空。在这种情况下,我们返回 Poll::Pending

3.3.4 Waker Support

Like the Futures::poll method, the Stream::poll_next method requires the asynchronous task to notify the executor when it becomes ready after Poll::Pending is returned. This way, the executor does not need to poll the same task again until it is notified, which greatly reduces the performance overhead of waiting tasks.

Futures::poll 方法一样, Stream::poll_next 方法要求异步任务在返回 Poll::Pending 后准备就绪时通知执行器。这样,执行器就不需要再次轮询同一个任务,直到收到通知为止,这大大降低了等待任务的性能开销。

To send this notification, the task should extract the Waker from the passed Context reference and store it somewhere. When the task becomes ready, it should invoke the wake method on the stored Waker to notify the executor that the task should be polled again.

要发送此通知,任务应从传递的 Context 引用中提取 Waker 并将其存储在某处。当任务准备就绪时,它应该调用存储的 Waker 上的 wake 方法来通知执行器应该再次轮询该任务。

3.3.4.1 AtomicWaker

To implement the Waker notification for our ScancodeStream, we need a place where we can store the Waker between poll calls. We can’t store it as a field in the ScancodeStream itself because it needs to be accessible from the add_scancode function. The solution to this is to use a static variable of the AtomicWaker type provided by the futures-util crate. Like the ArrayQueue type, this type is based on atomic instructions and can be safely stored in a static and modified concurrently.

为了实现 ScancodeStreamWaker 通知,我们需要一个可以在 poll 调用之间存储 Waker 的地方。我们无法将其作为字段存储在 ScancodeStream 本身中,因为它需要可以从 add_scancode 函数访问。解决方案是使用 futures-util 包提供的 AtomicWaker 类型的静态变量。与 ArrayQueue 类型一样,该类型基于原子指令,可以安全地存储在 static 中并并发修改。

Let’s use the AtomicWaker type to define a static WAKER:

让我们使用 AtomicWaker 类型来定义静态 WAKER

// in src/task/keyboard.rs

use futures_util::task::AtomicWaker;

static WAKER: AtomicWaker = AtomicWaker::new();

The idea is that the poll_next implementation stores the current waker in this static, and the add_scancode function calls the wake function on it when a new scancode is added to the queue.

这个想法是 poll_next 实现将当前唤醒器存储在此静态中,并且当将新的扫描码添加到时, add_scancode 函数调用其上的 wake 函数队列。

3.3.4.2 Storing a Waker 存储唤醒器

The contract defined by poll/poll_next requires the task to register a wakeup for the passed Waker when it returns Poll::Pending. Let’s modify our poll_next implementation to satisfy this requirement:

poll / poll_next 定义的协定要求任务在返回 Poll::Pending 时为传递的 Waker 注册唤醒。让我们修改 poll_next 实现来满足此要求:

// in src/task/keyboard.rs

impl Stream for ScancodeStream {
    type Item = u8;

    fn poll_next(self: Pin, cx: &mut Context) -> Poll {
        let queue = SCANCODE_QUEUE
            .try_get()
            .expect("scancode queue not initialized");

        // fast path
        if let Ok(scancode) = queue.pop() {
            return Poll::Ready(Some(scancode));
        }

        WAKER.register(&cx.waker());
        match queue.pop() {
            Ok(scancode) => {
                WAKER.take();
                Poll::Ready(Some(scancode))
            }
            Err(crossbeam_queue::PopError) => Poll::Pending,
        }
    }
}

Like before, we first use the OnceCell::try_get function to get a reference to the initialized scancode queue. We then optimistically try to pop from the queue and return Poll::Ready when it succeeds. This way, we can avoid the performance overhead of registering a waker when the queue is not empty.

和之前一样,我们首先使用 OnceCell::try_get 函数来获取对初始化的扫描码队列的引用。然后,我们乐观地尝试从队列中取出 pop ,并在成功时返回 Poll::Ready 。这样,我们就可以避免在队列不为空时注册唤醒程序的性能开销。

If the first call to queue.pop() does not succeed, the queue is potentially empty. Only potentially because the interrupt handler might have filled the queue asynchronously immediately after the check. Since this race condition can occur again for the next check, we need to register the Waker in the WAKER static before the second check. This way, a wakeup might happen before we return Poll::Pending, but it is guaranteed that we get a wakeup for any scancodes pushed after the check.

如果对 queue.pop() 的第一次调用不成功,则队列可能为空。唯一可能的原因是中断处理程序可能在检查后立即异步填充队列。由于这种竞争条件可能会在下一次检查中再次发生,因此我们需要在第二次检查之前在 WAKER 静态中注册 Waker 。这样,唤醒可能会在我们返回 Poll::Pending 之前发生,但可以保证我们在检查后推送的任何扫描码都会得到唤醒。

After registering the Waker contained in the passed Context through the AtomicWaker::register function, we try to pop from the queue a second time. If it now succeeds, we return Poll::Ready. We also remove the registered waker again using AtomicWaker::take because a waker notification is no longer needed. In case queue.pop() fails for a second time, we return Poll::Pending like before, but this time with a registered wakeup.

通过 AtomicWaker::register 函数注册了传递的 Context 中包含的 Waker 后,我们尝试第二次从队列中弹出。如果现在成功,我们返回 Poll::Ready 。我们还使用 AtomicWaker::take 再次删除已注册的唤醒器,因为不再需要唤醒器通知。如果 queue.pop() 第二次失败,我们会像以前一样返回 Poll::Pending ,但这次带有注册的唤醒。

Note that there are two ways that a wakeup can happen for a task that did not return Poll::Pending (yet). One way is the mentioned race condition when the wakeup happens immediately before returning Poll::Pending. The other way is when the queue is no longer empty after registering the waker, so that Poll::Ready is returned. Since these spurious wakeups are not preventable, the executor needs to be able to handle them correctly.

请注意,对于尚未返回 Poll::Pending 的任务,可以通过两种方式进行唤醒。一种方法是在返回 Poll::Pending 之前立即发生唤醒时提到的竞争条件。另一种方式是注册唤醒器后队列不再为空时,返回 Poll::Ready 。由于这些虚假唤醒是无法预防的,因此执行器需要能够正确处理它们。

3.3.4.3 Waking the Stored Waker 唤醒存储的唤醒器

To wake the stored Waker, we add a call to WAKER.wake() in the add_scancode function:

为了唤醒存储的 Waker ,我们在 add_scancode 函数中添加对 WAKER.wake() 的调用:

// in src/task/keyboard.rs

pub(crate) fn add_scancode(scancode: u8) {
    if let Ok(queue) = SCANCODE_QUEUE.try_get() {
        if let Err(_) = queue.push(scancode) {
            println!("WARNING: scancode queue full; dropping keyboard input");
        } else {
            WAKER.wake(); // new
        }
    } else {
        println!("WARNING: scancode queue uninitialized");
    }
}

The only change that we made is to add a call to WAKER.wake() if the push to the scancode queue succeeds. If a waker is registered in the WAKER static, this method will call the equally-named wake method on it, which notifies the executor. Otherwise, the operation is a no-op, i.e., nothing happens.

我们所做的唯一更改是,如果推送到扫描码队列成功,则添加对 WAKER.wake() 的调用。如果唤醒程序在 WAKER 静态中注册,则此方法将调用其上同名的 wake 方法,该方法会通知执行程序。否则,该操作是无操作,即什么也不会发生。

It is important that we call wake only after pushing to the queue because otherwise the task might be woken too early while the queue is still empty. This can, for example, happen when using a multi-threaded executor that starts the woken task concurrently on a different CPU core. While we don’t have thread support yet, we will add it soon and don’t want things to break then.

重要的是,我们仅在推送到队列后才调用 wake ,否则任务可能会在队列仍为空时过早被唤醒。例如,当使用多线程执行器在不同的 CPU 内核上同时启动唤醒任务时,可能会发生这种情况。虽然我们还没有线程支持,但我们很快就会添加它,并且不希望事情到那时就被破坏。

3.3.5 Keyboard Task 键盘任务

Now that we implemented the Stream trait for our ScancodeStream, we can use it to create an asynchronous keyboard task:

现在我们已经为 ScancodeStream 实现了 Stream 特征,我们可以使用它来创建异步键盘任务:

// in src/task/keyboard.rs

use futures_util::stream::StreamExt;
use pc_keyboard::{layouts, DecodedKey, HandleControl, Keyboard, ScancodeSet1};
use crate::print;

pub async fn print_keypresses() {
    let mut scancodes = ScancodeStream::new();
    let mut keyboard = Keyboard::new(layouts::Us104Key, ScancodeSet1,
        HandleControl::Ignore);

    while let Some(scancode) = scancodes.next().await {
        if let Ok(Some(key_event)) = keyboard.add_byte(scancode) {
            if let Some(key) = keyboard.process_keyevent(key_event) {
                match key {
                    DecodedKey::Unicode(character) => print!("{}", character),
                    DecodedKey::RawKey(key) => print!("{:?}", key),
                }
            }
        }
    }
}

The code is very similar to the code we had in our keyboard interrupt handler before we modified it in this post. The only difference is that, instead of reading the scancode from an I/O port, we take it from the ScancodeStream. For this, we first create a new Scancode stream and then repeatedly use the next method provided by the StreamExt trait to get a Future that resolves to the next element in the stream. By using the await operator on it, we asynchronously wait for the result of the future.

该代码与我们在本文中修改之前在键盘中断处理程序中的代码非常相似。唯一的区别是,我们不是从 I/O 端口读取扫描码,而是从 ScancodeStream 读取扫描码。为此,我们首先创建一个新的 Scancode 流,然后重复使用 StreamExt 特征提供的 next 方法来获取 Future 解析为流中的下一个元素。通过对其使用 await 运算符,我们异步等待Future的结果。

We use while let to loop until the stream returns None to signal its end. Since our poll_next method never returns None, this is effectively an endless loop, so the print_keypresses task never finishes.

我们使用 while let 进行循环,直到流返回 None 以表示其结束。由于我们的 poll_next 方法永远不会返回 None ,这实际上是一个无限循环,因此 print_keypresses 任务永远不会完成。

Let’s add the print_keypresses task to our executor in our main.rs to get working keyboard input again:

让我们将 print_keypresses 任务添加到 main.rs 中的执行器中,以再次获得有效的键盘输入:

// in src/main.rs

use blog_os::task::keyboard; // new

fn kernel_main(boot_info: &'static BootInfo) -> ! {

    // […] initialization routines, including init_heap, test_main

    let mut executor = SimpleExecutor::new();
    executor.spawn(Task::new(example_task()));
    executor.spawn(Task::new(keyboard::print_keypresses())); // new
    executor.run();

    // […] "it did not crash" message, hlt_loop
}

When we execute cargo run now, we see that keyboard input works again:

当我们现在执行 cargo run 时,我们看到键盘输入再次起作用:

QEMU printing “…..H…e…l…l..o….. …W..o..r….l…d…!”

If you keep an eye on the CPU utilization of your computer, you will see that the QEMU process now continuously keeps the CPU busy. This happens because our SimpleExecutor polls tasks over and over again in a loop. So even if we don’t press any keys on the keyboard, the executor repeatedly calls poll on our print_keypresses task, even though the task cannot make any progress and will return Poll::Pending each time.

如果您密切关注计算机的 CPU 利用率,您会发现 QEMU 进程现在持续使 CPU 处于繁忙状态。发生这种情况是因为我们的 SimpleExecutor 在循环中一遍又一遍地轮询任务。因此,即使我们没有按键盘上的任何键,执行器也会在我们的 print_keypresses 任务上重复调用 poll ,即使该任务无法取得任何进展并会返回 Poll::Pending 每次。

3.4 Executor with Waker Support

具有 Waker 支持的执行器

To fix the performance problem, we need to create an executor that properly utilizes the Waker notifications. This way, the executor is notified when the next keyboard interrupt occurs, so it does not need to keep polling the print_keypresses task over and over again.

为了解决性能问题,我们需要创建一个正确利用 Waker 通知的执行器。这样,当下一个键盘中断发生时,执行器就会收到通知,因此不需要一遍又一遍地不断轮询 print_keypresses 任务。

3.4.1 Task Id 任务编号

The first step in creating an executor with proper support for waker notifications is to give each task a unique ID. This is required because we need a way to specify which task should be woken. We start by creating a new TaskId wrapper type:

创建对唤醒通知提供适当支持的执行程序的第一步是为每个任务提供唯一的 ID。这是必需的,因为我们需要一种方法来指定应该唤醒哪个任务。我们首先创建一个新的 TaskId 包装类型:

// in src/task/mod.rs

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TaskId(u64);

The TaskId struct is a simple wrapper type around u64. We derive a number of traits for it to make it printable, copyable, comparable, and sortable. The latter is important because we want to use TaskId as the key type of a BTreeMap in a moment.

TaskId 结构是 u64 的简单包装类型。我们为其赋予了许多特征,使其可打印、可复制、可比较和可排序。后者很重要,因为我们稍后想使用 TaskId 作为 BTreeMap 的键类型。

To create a new unique ID, we create a TaskId::new function:

为了创建新的唯一 ID,我们创建一个 TaskId::new 函数:

use core::sync::atomic::{AtomicU64, Ordering};

impl TaskId {
    fn new() -> Self {
        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
        TaskId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
    }
}

The function uses a static NEXT_ID variable of type AtomicU64 to ensure that each ID is assigned only once. The fetch_add method atomically increases the value and returns the previous value in one atomic operation. This means that even when the TaskId::new method is called in parallel, every ID is returned exactly once. The Ordering parameter defines whether the compiler is allowed to reorder the fetch_add operation in the instructions stream. Since we only require that the ID be unique, the Relaxed ordering with the weakest requirements is enough in this case.

该函数使用 AtomicU64 类型的静态 NEXT_ID 变量来确保每个 ID 仅分配一次。 fetch_add 方法在一个原子操作中原子地增加值并返回前一个值。这意味着即使并行调用 TaskId::new 方法,每个 ID 也只会返回一次。 Ordering 参数定义是否允许编译器对指令流中的 fetch_add 操作进行重新排序。由于我们只要求 ID 唯一,因此在这种情况下,要求最弱的 Relaxed 排序就足够了。

We can now extend our Task type with an additional id field:

我们现在可以使用附加的 id 字段来扩展 Task 类型:

// in src/task/mod.rs

pub struct Task {
    id: TaskId, // new
    future: Pin,
}

impl Task {
    pub fn new(future: impl Future + 'static) -> Task {
        Task {
            id: TaskId::new(), // new
            future: Box::pin(future),
        }
    }
}

The new id field makes it possible to uniquely name a task, which is required for waking a specific task.

新的 id 字段可以唯一地命名任务,这是唤醒特定任务所必需的。

3.4.2 The Executor Type

We create our new Executor type in a task::executor module:

我们在 task::executor 模块中创建新的 Executor 类型:

// in src/task/mod.rs

pub mod executor;
// in src/task/executor.rs

use super::{Task, TaskId};
use alloc::{collections::BTreeMap, sync::Arc};
use core::task::Waker;
use crossbeam_queue::ArrayQueue;

pub struct Executor {
    tasks: BTreeMap,
    task_queue: Arc,
    waker_cache: BTreeMap,
}

impl Executor {
    pub fn new() -> Self {
        Executor {
            tasks: BTreeMap::new(),
            task_queue: Arc::new(ArrayQueue::new(100)),
            waker_cache: BTreeMap::new(),
        }
    }
}

Instead of storing tasks in a VecDeque like we did for our SimpleExecutor, we use a task_queue of task IDs and a BTreeMap named tasks that contains the actual Task instances. The map is indexed by the TaskId to allow efficient continuation of a specific task.

我们不像 SimpleExecutor 那样将任务存储在 VecDeque 中,而是使用任务 ID 的 task_queue 和名为 BTreeMap 包含实际的 Task 实例。该映射由 TaskId 索引,以允许高效地继续特定任务。

The task_queue field is an ArrayQueue of task IDs, wrapped into the Arc type that implements reference counting. Reference counting makes it possible to share ownership of the value among multiple owners. It works by allocating the value on the heap and counting the number of active references to it. When the number of active references reaches zero, the value is no longer needed and can be deallocated.

task_queue 字段是任务 ID 的 ArrayQueue ,包装到实现引用计数的 Arc 类型中。引用计数使得在多个所有者之间共享该值的所有权成为可能。它的工作原理是在堆上分配值并计算对其的活动引用的数量。当活动引用的数量达到零时,不再需要该值并且可以释放该值。

We use this Arc type for the task_queue because it will be shared between the executor and wakers. The idea is that the wakers push the ID of the woken task to the queue. The executor sits on the receiving end of the queue, retrieves the woken tasks by their ID from the tasks map, and then runs them. The reason for using a fixed-size queue instead of an unbounded queue such as SegQueue is that interrupt handlers should not allocate on push to this queue.

我们将这个 Arc 类型用于 task_queue ,因为它将在执行器和唤醒器之间共享。这个想法是唤醒器将唤醒任务的 ID 推送到队列中。执行器位于队列的接收端,通过 ID 从 tasks 映射中检索唤醒的任务,然后运行它们。使用固定大小队列而不是无界队列(例如 SegQueue )的原因是中断处理程序不应在推送到此队列时进行分配。

In addition to the task_queue and the tasks map, the Executor type has a waker_cache field that is also a map. This map caches the Waker of a task after its creation. This has two reasons: First, it improves performance by reusing the same waker for multiple wake-ups of the same task instead of creating a new waker each time. Second, it ensures that reference-counted wakers are not deallocated inside interrupt handlers because it could lead to deadlocks (there are more details on this below).

除了 task_queuetasks 映射之外, Executor 类型还有一个 waker_cache 字段,它也是一个映射。该映射在任务创建后缓存其 Waker 。这有两个原因:首先,它通过重复使用同一个唤醒程序来多次唤醒同一任务,而不是每次创建一个新的唤醒程序,从而提高性能。其次,它确保引用计数唤醒器不会在中断处理程序内释放,因为它可能导致死锁(下面有更多详细信息)。

To create an Executor, we provide a simple new function. We choose a capacity of 100 for the task_queue, which should be more than enough for the foreseeable future. In case our system will have more than 100 concurrent tasks at some point, we can easily increase this size.

为了创建 Executor ,我们提供了一个简单的 new 函数。我们为 task_queue 选择容量 100,这对于可预见的Future来说应该足够了。如果我们的系统在某个时候有超过 100 个并发任务,我们可以轻松地增加这个大小。

3.4.3 Spawning Tasks 生成任务

As for the SimpleExecutor, we provide a spawn method on our Executor type that adds a given task to the tasks map and immediately wakes it by pushing its ID to the task_queue:

至于 SimpleExecutor ,我们在 Executor 类型上提供了 spawn 方法,该方法将给定任务添加到 tasks 映射中并立即唤醒通过将其 ID 推送到 task_queue 来实现:

// in src/task/executor.rs

impl Executor {
    pub fn spawn(&mut self, task: Task) {
        let task_id = task.id;
        if self.tasks.insert(task.id, task).is_some() {
            panic!("task with same ID already in tasks");
        }
        self.task_queue.push(task_id).expect("queue full");
    }
}

If there is already a task with the same ID in the map, the [BTreeMap::insert] method returns it. This should never happen since each task has a unique ID, so we panic in this case since it indicates a bug in our code. Similarly, we panic when the task_queue is full since this should never happen if we choose a large-enough queue size.

如果map中已经存在具有相同 ID 的任务,则 [ BTreeMap::insert ] 方法将返回该任务。这种情况永远不应该发生,因为每个任务都有一个唯一的 ID,因此在这种情况下我们会感到恐慌,因为它表明我们的代码中存在错误。同样,当 task_queue 已满时,我们会感到恐慌,因为如果我们选择足够大的队列大小,则永远不会发生这种情况。

3.4.5 Running Tasks 运行任务

To execute all tasks in the task_queue, we create a private run_ready_tasks method:

为了执行 task_queue 中的所有任务,我们创建一个私有 run_ready_tasks 方法:

// in src/task/executor.rs

use core::task::{Context, Poll};

impl Executor {
    fn run_ready_tasks(&mut self) {
        // destructure `self` to avoid borrow checker errors
        let Self {
            tasks,
            task_queue,
            waker_cache,
        } = self;

        while let Ok(task_id) = task_queue.pop() {
            let task = match tasks.get_mut(&task_id) {
                Some(task) => task,
                None => continue, // task no longer exists
            };
            let waker = waker_cache
                .entry(task_id)
                .or_insert_with(|| TaskWaker::new(task_id, task_queue.clone()));
            let mut context = Context::from_waker(waker);
            match task.poll(&mut context) {
                Poll::Ready(()) => {
                    // task done -> remove it and its cached waker
                    tasks.remove(&task_id);
                    waker_cache.remove(&task_id);
                }
                Poll::Pending => {}
            }
        }
    }
}

The basic idea of this function is similar to our SimpleExecutor: Loop over all tasks in the task_queue, create a waker for each task, and then poll them. However, instead of adding pending tasks back to the end of the task_queue, we let our TaskWaker implementation take care of adding woken tasks back to the queue. The implementation of this waker type will be shown in a moment.

这个函数的基本思想和我们的 SimpleExecutor 类似:循环遍历 task_queue 中的所有任务,为每个任务创建一个唤醒器,然后轮询它们。但是,我们不是将挂起的任务添加回 task_queue 的末尾,而是让 TaskWaker 实现负责将唤醒的任务添加回队列。稍后将展示此唤醒器类型的实现。

Let’s look into some of the implementation details of this run_ready_tasks method:

让我们看一下这个 run_ready_tasks 方法的一些实现细节:

  • We use destructuring to split self into its three fields to avoid some borrow checker errors. Namely, our implementation needs to access the self.task_queue from within a closure, which currently tries to borrow self completely. This is a fundamental borrow checker issue that will be resolved when RFC 2229 is implemented.

  • 我们使用解构将 self 拆分为三个字段,以避免一些借用检查器错误。也就是说,我们的实现需要从闭包内访问 self.task_queue ,当前它试图完全借用 self 。这是一个基本的借用检查器问题,将在实施 RFC 2229 时得到解决。

  • For each popped task ID, we retrieve a mutable reference to the corresponding task from the tasks map. Since our ScancodeStream implementation registers wakers before checking whether a task needs to be put to sleep, it might happen that a wake-up occurs for a task that no longer exists. In this case, we simply ignore the wake-up and continue with the next ID from the queue.

  • 对于每个弹出的任务 ID,我们从 tasks 映射中检索对相应任务的可变引用。由于我们的 ScancodeStream 实现在检查任务是否需要进入睡眠状态之前注册唤醒器,因此可能会发生唤醒不再存在的任务的情况。在这种情况下,我们只需忽略唤醒并继续处理队列中的下一个 ID。

  • To avoid the performance overhead of creating a waker on each poll, we use the waker_cache map to store the waker for each task after it has been created. For this, we use the BTreeMap::entry method in combination with Entry::or_insert_with to create a new waker if it doesn’t exist yet and then get a mutable reference to it. For creating a new waker, we clone the task_queue and pass it together with the task ID to the TaskWaker::new function (implementation shown below). Since the task_queue is wrapped into an Arc, the clone only increases the reference count of the value, but still points to the same heap-allocated queue. Note that reusing wakers like this is not possible for all waker implementations, but our TaskWaker type will allow it.

  • 为了避免在每个轮询上创建唤醒器的性能开销,我们在创建每个任务后使用 waker_cache 映射来存储每个任务的唤醒器。为此,我们使用 BTreeMap::entry 方法与 Entry::or_insert_with 结合使用来创建一个新的唤醒程序(如果它尚不存在),然后获取对其的可变引用。为了创建新的唤醒程序,我们克隆 task_queue 并将其与任务 ID 一起传递给 TaskWaker::new 函数(实现如下所示)。由于 task_queue 被包装到 Arc 中,因此 clone 仅增加值的引用计数,但仍然指向相同的堆分配队列。请注意,并非所有唤醒器实现都可以像这样重用唤醒器,但我们的 TaskWaker 类型允许这样做。

A task is finished when it returns Poll::Ready. In that case, we remove it from the tasks map using the BTreeMap::remove method. We also remove its cached waker, if it exists.

当任务返回 Poll::Ready 时,任务就完成了。在这种情况下,我们使用 BTreeMap::remove 方法将其从 tasks 映射中删除。我们还删除其缓存的唤醒程序(如果存在)。

3.4.6 Waker Design

The job of the waker is to push the ID of the woken task to the task_queue of the executor. We implement this by creating a new TaskWaker struct that stores the task ID and a reference to the task_queue:

唤醒器的工作是将被唤醒任务的ID推送到执行器的 task_queue 。我们通过创建一个新的 TaskWaker 结构来实现这一点,该结构存储任务 ID 和对 task_queue 的引用:

// in src/task/executor.rs

struct TaskWaker {
    task_id: TaskId,
    task_queue: Arc,
}

Since the ownership of the task_queue is shared between the executor and wakers, we use the Arc wrapper type to implement shared reference-counted ownership.

由于 task_queue 的所有权在执行器和唤醒器之间共享,因此我们使用 Arc 包装类型来实现共享引用计数所有权。

The implementation of the wake operation is quite simple:

唤醒操作的实现非常简单:

// in src/task/executor.rs

impl TaskWaker {
    fn wake_task(&self) {
        self.task_queue.push(self.task_id).expect("task_queue full");
    }
}

We push the task_id to the referenced task_queue. Since modifications to the ArrayQueue type only require a shared reference, we can implement this method on &self instead of &mut self.

我们将 task_id 推送到引用的 task_queue 。由于对 ArrayQueue 类型的修改只需要共享引用,因此我们可以在 &self 而不是 &mut self 上实现此方法。

3.4.6.1 The Wake Trait

In order to use our TaskWaker type for polling futures, we need to convert it to a Waker instance first. This is required because the Future::poll method takes a Context instance as an argument, which can only be constructed from the Waker type. While we could do this by providing an implementation of the RawWaker type, it’s both simpler and safer to instead implement the Arc-based Wake trait and then use the From implementations provided by the standard library to construct the Waker.

为了使用我们的 TaskWaker 类型来轮询 future,我们需要首先将其转换为 Waker 实例。这是必需的,因为 Future::poll 方法采用 Context 实例作为参数,该实例只能从 Waker 类型构造。虽然我们可以通过提供 RawWaker 类型的实现来做到这一点,但实现基于 ArcWake 特征然后使用标准库提供的 From 实现用于构造 Waker

The trait implementation looks like this:

特征实现如下所示:

// in src/task/executor.rs

use alloc::task::Wake;

impl Wake for TaskWaker {
    fn wake(self: Arc) {
        self.wake_task();
    }

    fn wake_by_ref(self: &Arc) {
        self.wake_task();
    }
}

Since wakers are commonly shared between the executor and the asynchronous tasks, the trait methods require that the Self instance is wrapped in the Arc type, which implements reference-counted ownership. This means that we have to move our TaskWaker to an Arc in order to call them.

由于唤醒程序通常在执行程序和异步任务之间共享,因此特征方法要求将 Self 实例包装在 Arc 类型中,该类型实现引用计数所有权。这意味着我们必须将 TaskWaker 移动到 Arc 才能调用它们。

The difference between the wake and wake_by_ref methods is that the latter only requires a reference to the Arc, while the former takes ownership of the Arc and thus often requires an increase of the reference count. Not all types support waking by reference, so implementing the wake_by_ref method is optional. However, it can lead to better performance because it avoids unnecessary reference count modifications. In our case, we can simply forward both trait methods to our wake_task function, which requires only a shared &self reference.

wakewake_by_ref 方法之间的区别在于,后者仅需要对 Arc 的引用,而前者则拥有 Arc 方法是可选的。但是,它可以带来更好的性能,因为它避免了不必要的引用计数修改。在我们的例子中,我们可以简单地将两个特征方法转发到我们的 wake_task 函数,该函数只需要一个共享的 &self 引用。

3.4.6.2 Creating Wakers 创造唤醒者

Since the Waker type supports From conversions for all Arc-wrapped values that implement the Wake trait, we can now implement the TaskWaker::new function that is required by our Executor::run_ready_tasks method:

由于 Waker 类型支持所有实现 Wake 特征的 Arc 包装值的 From 转换,我们现在可以实现 TaskWaker::new 我们的 Executor::run_ready_tasks 方法所需的函数:

// in src/task/executor.rs

impl TaskWaker {
    fn new(task_id: TaskId, task_queue: Arc) -> Waker {
        Waker::from(Arc::new(TaskWaker {
            task_id,
            task_queue,
        }))
    }
}

We create the TaskWaker using the passed task_id and task_queue. We then wrap the TaskWaker in an Arc and use the Waker::from implementation to convert it to a Waker. This from method takes care of constructing a RawWakerVTable and a RawWaker instance for our TaskWaker type. In case you’re interested in how it works in detail, check out the implementation in the alloc crate.

我们使用传递的 task_idtask_queue 创建 TaskWaker 。然后,我们将 TaskWaker 包装在 Arc 中,并使用 Waker::from 实现将其转换为 Waker 。这个 from 方法负责为我们的 TaskWaker 类型构造一个 RawWakerVTable 和一个 RawWaker 实例。如果您对它的详细工作原理感兴趣,请查看 alloc 箱中的实现。

3.4.7 A run Method

With our waker implementation in place, we can finally construct a run method for our executor:

完成唤醒器实现后,我们最终可以为执行器构造一个 run 方法:

// in src/task/executor.rs

impl Executor {
    pub fn run(&mut self) -> ! {
        loop {
            self.run_ready_tasks();
        }
    }
}

This method just calls the run_ready_tasks function in a loop. While we could theoretically return from the function when the tasks map becomes empty, this would never happen since our keyboard_task never finishes, so a simple loop should suffice. Since the function never returns, we use the ! return type to mark the function as diverging to the compiler.

该方法只是在循环中调用 run_ready_tasks 函数。虽然理论上我们可以在 tasks 映射变空时从函数返回,但这种情况永远不会发生,因为我们的 keyboard_task 永远不会完成,因此一个简单的 loop 就足够了。由于该函数永远不会返回,因此我们使用 ! 返回类型将该函数标记为与编译器不同。

We can now change our kernel_main to use our new Executor instead of the SimpleExecutor:

我们现在可以更改 kernel_main 以使用新的 Executor 而不是 SimpleExecutor

// in src/main.rs

use blog_os::task::executor::Executor; // new

fn kernel_main(boot_info: &'static BootInfo) -> ! {
    // […] initialization routines, including init_heap, test_main

    let mut executor = Executor::new(); // new
    executor.spawn(Task::new(example_task()));
    executor.spawn(Task::new(keyboard::print_keypresses()));
    executor.run();
}

We only need to change the import and the type name. Since our run function is marked as diverging, the compiler knows that it never returns, so we no longer need a call to hlt_loop at the end of our kernel_main function.

我们只需要更改导入和类型名称。由于我们的 run 函数被标记为发散,编译器知道它永远不会返回,因此我们不再需要在 kernel_main 末尾调用 hlt_loop 功能。

When we run our kernel using cargo run now, we see that keyboard input still works:

现在,当我们使用 cargo run 运行内核时,我们看到键盘输入仍然有效:

QEMU printing “…..H…e…l…l..o….. …a..g..a….i…n…!”

However, the CPU utilization of QEMU did not get any better. The reason for this is that we still keep the CPU busy the whole time. We no longer poll tasks until they are woken again, but we still check the task_queue in a busy loop. To fix this, we need to put the CPU to sleep if there is no more work to do.

然而,QEMU 的 CPU 利用率并没有得到任何改善。原因是我们仍然让 CPU 一直处于忙碌状态。我们不再轮询任务,直到它们再次被唤醒,但我们仍然在繁忙循环中检查 task_queue 。为了解决这个问题,如果没有更多的工作要做,我们需要让 CPU 进入睡眠状态。

3.4.8 Sleep If Idle

The basic idea is to execute the hlt instruction when the task_queue is empty. This instruction puts the CPU to sleep until the next interrupt arrives. The fact that the CPU immediately becomes active again on interrupts ensures that we can still directly react when an interrupt handler pushes to the task_queue.

基本思想是当 task_queue 为空时执行 hlt 指令。该指令使 CPU 进入睡眠状态,直到下一个中​​断到来。事实上,CPU 在发生中断时立即再次激活,这确保了当中断处理程序推送到 task_queue 时我们仍然可以直接做出反应。

To implement this, we create a new sleep_if_idle method in our executor and call it from our run method:

为了实现这一点,我们在执行器中创建一个新的 sleep_if_idle 方法,并从 run 方法中调用它:

// in src/task/executor.rs

impl Executor {
    pub fn run(&mut self) -> ! {
        loop {
            self.run_ready_tasks();
            self.sleep_if_idle();   // new
        }
    }

    fn sleep_if_idle(&self) {
        if self.task_queue.is_empty() {
            x86_64::instructions::hlt();
        }
    }
}

Since we call sleep_if_idle directly after run_ready_tasks, which loops until the task_queue becomes empty, checking the queue again might seem unnecessary. However, a hardware interrupt might occur directly after run_ready_tasks returns, so there might be a new task in the queue at the time the sleep_if_idle function is called. Only if the queue is still empty, do we put the CPU to sleep by executing the hlt instruction through the instructions::hlt wrapper function provided by the x86_64 crate.

由于我们在 run_ready_tasks 之后直接调用 sleep_if_idle ,循环直到 task_queue 变空,因此再次检查队列似乎没有必要。但是,硬件中断可能会在 run_ready_tasks 返回后立即发生,因此在调用 sleep_if_idle 函数时队列中可能有一个新任务。只有当队列仍然为空时,我们才会通过 x86_64 包提供的 instructions::hlt 包装函数执行 hlt 指令来让 CPU 进入睡眠状态。

Unfortunately, there is still a subtle race condition in this implementation. Since interrupts are asynchronous and can happen at any time, it is possible that an interrupt happens right between the is_empty check and the call to hlt:

不幸的是,这个实现中仍然存在微妙的竞争条件。由于中断是异步的并且可能随时发生,因此中断可能发生在 is_empty 检查和调用 hlt 之间:

if self.task_queue.is_empty() {
///

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论