浅谈协程
在开始展开协程前,我们先来看一下一些非 C++语言中的协程实现。
(一)其他语言中的协程实现
很多语言里面,协程是作为 "一类公民" 直接加入到语言特性中的, 比如:
- Dart1.9示例代码
Future getPage(t) async {
var c = new http.Client();
try {
var r = await c.get('http://xxx');
print(r);
return r.length();
} finally {
await c.close();
}
}
- Python示例代码
async def abinary(n):
if n 线程 这些, 基本上各种语言通过操作系统提供的 Api, 都能直接获取操作系统提供的这些能力了. 其实操作系统按任务的调度方式来区分, 有以下两种模式:- 协作式多任务操作系统
- 抢占式多任务操作系统
抢占式多任务操作系统我们刚刚说过了, 而协程本身的特性, 跟协作式多任务操作系统所提供的机制基本一致, 对于每个 Task, 我们可以多次的中断和继续执行, 说到这里, 熟悉 Dos 开发的同学肯定就会想到 "INT 21H"了, 这个其实就是我们早期利用相关机制来实现多任务协同目的的一种方式了, 我们也可以看成这是协程最早的雏形。
聊到中断, 其中比较重要的就是执行环境的保存和恢复了, 而上下文的保存能力可以是操作系统直接提供的, 也可以是程序机制自身所提供的了, 综上所述, 我们大致可以将 c++中的协程的实现方案的迭代看成如下情况:
- 最早利用 setjump 来实现的协作式任务调度器
- 系统级实现, 如 linux 提供的 ucontext 相关 API, Windows 提供的 Fiber 相关的 Api
- 由系统级实现所衍生出的高性能方案, 一般是借签系统级的实现, 移除一些非必须的操作所达成的, 代表的方案有大家熟知的libco和 boost::context, 也就是我们通常所说的有栈协程实现
- 无栈实现, 最开始是纯粹使用 duff device hack 出来的方案, 后续被 MS 规整, 部分特性依赖 compiler 实现, 逐步演化成现在的 c++20 coroutine 机制了。
(三)协程的执行简介
了解了协程在 C++中的部分历史, 我们来简单了解一下协程的执行机制, 这里我们直接以 C++20 为例, 先来看一下概览图:
关于协程的执行, 我们主要关注以下这些地方:
- 中断点和重入点的定义
有栈协程和无栈协程定义中断点和重入点的方式和机制略有差异, 执行到中断点和重入点的时候大家使用的保存和恢复机制不太一样, 但以 Host App 的视角来看, 整体的执行过程其实是比较一致的。
这里我们是以 C++20 的无栈协程来举例的, 通过图中的关键字co_await,我们定义了 point1 和 point2 两个成对的中断点和重入点。
我们来看一下协程执行到中断点和重入点的时候具体发生的事情:中断点:协程中断执行的时候, 我们需要对当前的执行状态:
- 协程执行到哪了。
- 协程当前使用的 context 进行保存, 并将程序的执行权归还给外界. 此时我们也可以返回必要的值给外界, 方便外界更好的对协程的后续执行进行控制。
重入点:重入点是由中断点带出来的概念, 既然函数的执行能够被中断(suspend), 那我们肯定也需要提供机制相关的机制恢复协程的执行了, 在复杂执行的时候, 我们需要对协程保存的执行状态进行恢复:
- 恢复到上次挂起执行的地方继续执行
- 恢复保存的 context
- 传递必要的值到协程
整个协程的执行区别于普通函数的单次执行返回结果,一般都会有多次的中断与重入,直到协程执行完成或者被外界强行中止。
而有栈协程和无栈协程的实现,差异最大的地方就是如下两点了:
- 怎么保存和恢复当前的执行位置。
- 怎么保存和恢复当前协程引用到的内存(变量等) 本篇主要侧重无栈协程, 无栈协程相关的机制后续会具体展开. 对有栈协程相关机制感兴趣的可以翻阅 libco 或 boost.context 相关的内容进行了解。
(四) 小议无栈协程的出现
其实之前介绍 C++协程历史的时候, 我们有一个问题没有展开, 为啥有了像 libco, 与 boost.context 这样的高性能有栈协程实现机制后, 标准委员会还会继续寻求无栈协程的解决方案, 并最终将其作为 C++协程的实现机制呢, 这里分析主要的原因是为了解决有栈协程天然存在的限制:
- 业务复杂度膨胀带来的爆栈问题
- 使用过大的栈, 又会导致协程本身的切换开销上升或者占用内存过多.
而无栈协程解决这些问题的方式也非常直接, 既然栈会导致问题, 那么我们就直接去除对栈的依赖, 通过其他方式来解决数据存储访问的问题。
目前主要的方案是如下两种:
- Duff Device Hack实现
我们后面介绍的 C++17 的实现就是基于这种方案, 因为仅仅是框架级的实现, 我们能够使用的实现方式会受到限制, 方案本身存在如栈变量的使用有严格的限制等问题, 但对于一些特殊的场合, 如基于寄存器实现的 lua vm, 这种方式会比较契合。
- C++20 的 Coroutine
通过后面的分析, 我们其实会发现这与 Duff Device Hack 实现是一脉相承的, 只是通过 compiler 的配合, 像栈变量的自动处理等机制, 保证了用户可以低心智负担的使用它. 但同时, 相对其他语言的实现, 因为相关特性的设计是"面向库作者的实现", 实际使用基本都需要二次封装, 也就带来了社区很多负面的声音。
(五) 小结
前面我们对 C++中协程的历史做了简单的铺垫, 接下来我们将对 C++17 中基于 Duff Device Hack 的无栈协程实现, 以及 C++20 中的无栈协程做更深入的介绍。
C++17 Stackless Coroutine 实现
在异步操作比较多的情况下, 我们就考虑用协程来取代原来的 Callback 设计. 但当时的 GCC 用的是 8.3 版本, 并不支持 coroutine20, 所以我们最终采用的是一个基于 C++17 的无栈协程实现方案, 也就是使用前面介绍的 Duff Device Hack 方式实现的无栈协程. 我们先来看下当时的项目背景。
(一)项目的背景介绍
当时的情况也比较简单, R 工作室内有多个项目处于预研的状态, 所以大家打算协同共建一个工作室内的后台 C++ Framework, 用于工作室内几个预研项目中. 其中比较重要的一部分就是协程了, 当时引入协程的方式和目的都比较直接, 首先是使用 Duff Device Hack 的机制来实现整个无栈协程. 另外就是整个核心目标是希望通过引入协程和相关的调度器来帮助简化多节点的异步编程支持. 整个框架包含的几大部分如下图所示, Coroutine 机制以及相关的 Scheduler 封装是在 app_service 中作为 C++微服务的基础设施存在的。
实际使用下来, 协程和调度器主要带来了以下这些优点:
- 避免大量中间类的定义和使用。
- 基于逻辑过程本身用串行的方式实现相关代码即可(可参考后续切场景的例子)
- 更容易写出数据驱动向的实现。
- 还有比较关键的一点, 可以有效避免过多的异步 Callback 导致的逻辑混乱和难于跟踪调试的问题。
(二)为何从 C++17 说起
我们为什么先从 C++17 的无栈协程开始介绍, 这是因为 C++17 的实现与 20 的实现一脉相承. 如果我们分析 C++ 20 通过 Compiler 加工后的代码, 就会发现这点. 相比于 C++20 协程大量的细节隐藏在 Compiler 的处理中(当然我们后面也会介绍怎么查看 Compiler 处理的这部分逻辑), C++17 的方案, 整个组织都在我们自己的代码层面, 用于理解无栈协程的整体实现显然是更合适的. 另外, 相关的调度器的实现, 与 C++17 和 C++20 都是兼容的, 像我们项目当时的实现, 是可以很好的做到 C++20 与 C++17 的协程混用的, 也样也方便在过渡阶段, 项目可以更平滑的从 C++17 向 C++20 迁移. 另外, 对于一些不支持 C++20 的受限使用场景, C++17 依然具有它的实用性.
(三)实现概述
我们先来看一下整个机制的概览图:
从上图中我们能够了解到, 整个基于 Duff Device Hack 的无栈协程实现的方式. 首先我们通过 CoPromise 对象来保存用作协程的 std::function 对象, 另外我们也会记录协程当前的执行状态, 其次, 我们还会在 CoPromise 中内置一个 std::tuple用于保存我们需要在协程挂起和恢复时保存的状态值。
另外, 整个核心的执行机制是依赖于几个核心宏所组成的 switch case 状态机来驱动的. 结合上特殊的LINE宏, 我们可以在每个co_await()对象调用的时候, 设置 CoPromise 对象当前的执行状态为LINE**, 而下次跳转的时候, 通过 switch(state)就能正确跳转到上次执行中断的地方继续往下执行了. 当然, 我们会看到我们的case **LINE**其实被穿插到了do{ } while(0)中间, 这个其实就利用到了 duff device 特性, 允许你通过 case 快速的跳转到 for 循环或者 while 循环的内部, C 语言一个很特殊的特性. 利用这点, 首先我们可以完成**co_awiat()宏的封装, 其次, 我们也能在逻辑代码的 for 循环以及 while 循环中, 正确的应用 co_await(), 所以说 Duff Device 特性对于整个机制来说, 还是比较关键的.
如上例中所述的 Test Code 代码, co_begin()和co_end()展开后构成了 switch() {}的开始和结束部分, 而中间我们加入的__co_await()宏, 则会展开成用于完成中断点和重入点的 case 逻辑, 整体的封装还是很巧妙的.
(四)执行流程概述
整体的执行流程通过上面的分析我们也能比较简单的整理出来:
- 宏展开形成一个跨越协程函数首尾的大的 swith case 状态机。
- 协程执行时构建新的 CoPromise 对象, 正确的处理输入参数, 输入参数会被存储在 CoPromise 对象的 std::tuple上, 并且每次重入时作为函数的入口参数以引用的方式转入函数内部
- 每次 Resume()时根据当前 CoPromise 记录的 state, 跳转到正确的 case label 继续往下执行.
- 执行到下一个挂起点返回控制权到调度器
重复上述操作直到执行结束。
从整体机制上, 我们也能简单看到 C++17 对应实现的一些限制:
__co_begin()前不能有逻辑代码, 相关的代码会因为函数的重新执行被反复调用.
栈变量的使用, 因为本身机制的原因, 并不能正确的保存栈变量的值, 我们需要透过机制本身提供的机制来处理状态值 - 这个指的是被当成 std::tuple成员存储在 CoPromise 对象中的那些值, 每次函数执行会以引用的方式作为参数传递给协程函数.
(五) 另外一个示例代码
mScheduler.CreateTask([](int& c, LocalStruct& locals) -> logic::CoTaskForScheduler {
rco_begin();
{
locals.local_i = 1024;
auto* task = rco_self_task();
printf("step1 %dn", locals.local_i);
}
rco_yield_next_frame();
{
c = 0;
while(c < 5) {
printf("in while loop c = %dn", c);
rco_yield_sleep(1000);
c++;
}
rco_yield_next_frame();
}
rco_end();
}, 3, LocalStruct{});
从上例可以看出, 虽然存在上一节中我们提到的一些限制, 依照设定的规则进行编码实现, 整体使用还是比较简单易懂的。上面的 rco_yield_next_frame()和 rco_yield_sleep()是利用 Scheduler 的调度能力封装出来的挂起到下一帧继续恢复执行和休眠这两个异步操作语义。
(六)绕开栈变量限制的方法
提到栈变量的限制, 肯定有同学会想到, 是否有方法绕开栈变量的限制, 用一种更灵活的方式处理协程中临时值的存取, 使其在跨越中断点和重入点的情况依然有效?
答案是肯定的. 因为我们有明确的与协程关联的状态存储对象 CoPromise, 所以如果框架中有实现反射或者适应任意类型值存取的类型擦除机制, 我们当然能够很简单的对原有的实现进行扩展.
在 rstudio 的框架实现中, 我们通过在 CoPromise 对象上多存储一个额外的std::map的成员, 再配合适当的包装, 就很容易实现如下示例代码所展示的功能了:
rco_begin();
{
rco_set_value("id", 35567);
}
rco_yield_next_frame();
{
{
int64_t& val = rco_ref_value("id", int64_t);
val = 5;
}
locals.local_i = rco_to_value("id", int);
}
rco_end();
通过额外扩展的rco_set_value(), rco_ref_value(), rco_to_value(), 我们即完成了一个比较简单易用的通过 name 对各类型值进行存取的实现, 当然, 实际操作的其实都是在 CoPromise 上存储的std::map成员。
这块是反射的一个简单应用, 关于类型擦除的细节, 与本篇关联不大, 这里不进行具体的展开了。
(七)一个内部项目中后台切场景的代码示例
本章的结尾我们以一个具体的业务实例作为参考, 方便大家了解相关的实现在具体业务中的大致工作情形。
一个原来参与的项目的后台服务器是多节点的设计, 对于切场景来说, 需要访问多个节点来完成相关的操作, 大致的切场景时序图如下所示:
删减细节代码之后的主要异步代码如下图所示:
rco_begin();
{
locals.clientReq = req;
locals.session = CServerUtil::GetSessionObj(sessionId);
// ...
SSTbusppInstanceKey emptyInstKey;
emptyInstKey.Init();
if (locals.session->GetTargetGameSvrID() != emptyInstKey) {
// ...
rco_await(locals.gameSceneService->CheckChangeScene(locals.playerId, locals.checkChangeSceneReq));
// ...
// 保存大世界信息
// ...
rco_await(locals.gameSceneService->ExitMainland(locals.playerId, locals.exitMainlandReq));
// ...
}
auto gameMgrClient = GServer->GetRpcClient(TbusppInstanceKey{TBUSPP_SERVER_GAMEMGRSVR, ""});
locals.gameMgrService = rstudio::rpc_proxy::GameMgrService_Proxy::Create(gameMgrClient, GServer->GetRpcScheduler());
// ...
LOG_DEBUG(locals.playerId, "[CHANGE SCENE] ready to Queryline group");
}
rco_await(locals.gameMgrService->QueryMainland2(locals.playerId, locals.querySpaceReq));
{
// ...
rco_await(locals.gameSceneService->ChangeMainland(locals.playerId, locals.localInstanceKey, locals.changeMainlandReq));
// ...
}
// ...
LOG_DEBUG(locals.playerId, "[CHANGE SCENE] send change mainland_conf");
rco_emit_finish_event(rstudio::logic::CoRpcFinishEvent(rstudio::reflection::Value(locals.clientRes)));
rco_return;
rco_end();
通过 rco_await()发起的多个异步 Rpc 调用, 我们很好的完成了上述时序图对应的逻辑功能实现。
Rpc 相关的协程化封装在 C++20 中会有个相关的示例, 此处就不重复展开 C++17 的实现了。
C++20 Coroutine 机制简介
了解了 C++17 的 Stackless Coroutine 实现机制后, 我们接着来看一下 C++20 Coroutine 的实现. 首先我们先来通过核心对象概览图来简单了解一下 C++20 Coroutine:
如图所示, C++ Coroutine20 的核心对象有如下这些:
- Function Body: 通常普通函数添加 co_await 等协程关键字处理返回值就可以作为一个协程函数。
- coroutine_handle: 对协程的生命周期进行控制。
- promise_type: 异常处理, 结果接收, 同时也可以对协程部分行为进行配置, 如协程初始化时的状态, 结束时的状态等。
- Awaitable 对象: 业务侧的中断重入点定义和数据传输定制点, 结合 co_await 关键字, 我们就能借助 compiler 实现正确的中断, 重入语义了。
从图上也能看到, 对比其它语言较精简的 Coroutine 实现, C++20 这套实现, 还是偏复杂的, 这也是我们常调侃的 "库作者向" 实现, 虽然整体使用很灵活, 也能跟泛型很好的搭配, 但我们还是需要在框架层做大量的包装, 同时业务一般需要一个地方对应用中所有的协程做管理, 方便监控应用的整体运行情况等, 这也使得 C++这套特性没法很简单的直接在业务侧进行使用, 后续我们讲到 Coroutine Scheduler 的时候会进一步展开相关的内容。
此处我们只需要对 Coroutine 的核心对象的构成和作用有个简单的认知, 接下来我们会结合相关的示例代码来深入了解 C++20 Coroutine 的整体运作机制, 了解更多细节。
结合代码理解 Coroutine
(一) 一个简单的示例 - 并不简单
#include
#include
using namespace std;
struct resumable_thing
{
struct promise_type
{
resumable_thing get_return_object()
{
return resumable_thing(coroutine_handle::from_promise(*this));
}
auto initial_suspend() { return suspend_never{}; }
auto final_suspend() noexcept { return suspend_never{}; }
void return_void() {}
void unhandled_exception() {}
};
coroutine_handle _coroutine = nullptr;
resumable_thing() = default;
resumable_thing(resumable_thing const&) = delete;
resumable_thing& operator=(resumable_thing const&) = delete;
resumable_thing(resumable_thing&& other)
: _coroutine(other._coroutine) {
other._coroutine = nullptr;
}
resumable_thing& operator = (resumable_thing&& other) {
if (&other != this) {
_coroutine = other._coroutine;
other._coroutine = nullptr;
}
}
explicit resumable_thing(coroutine_handle coroutine) : _coroutine(coroutine)
{
}
~resumable_thing()
{
if (_coroutine) { _coroutine.destroy(); }
}
void resume() { _coroutine.resume(); }
};
resumable_thing counter() {
cout