作者 | lionleeli
曾看过很多并发模型相关的文章,但是这些文章大部分只讲了并发模型的实现原理,并没有给出具体的示例代码,看完总觉得对并发模型这个知识点是一知半解,不得要领。为了掌握高并发模型,我这里抛砖引玉,实现了20种常见的并发模型,并对每种并发模型进行了性能压测和分析。由于本人水平有限,文章中难免有一些不准确或者纰漏的地方,欢迎大家批评指正。
一、缘起
最近看了好友推荐的一本新书《Linux后端开发工程实践》 ,该书RPC框架和微服务集群的部分甚是不错,其中的“第10章-I/O模型与并发”中介绍了 17 种不同的并发模型,看完之后更是感觉受益匪浅。
但美中不足的是这 17 种并发模型只支持短连接,配套的 BenchMark 工具不支持发起指定的请求负载,给出的性能指标也不够丰富。
受到该内容的启发,我在该内容的基础上实现了 20 种常见的支持长连接的并发模型,完善了协议解析效率和BenchMark 工具。
二、前置说明
因为这 20 种并发模型的代码量已经达到了 1万2千多行,是不适合在一篇文章中全部展示的。所以我把相关的代码开源在github上,方便大家查看。
在文中介绍到相关代码时都会给出具体的代码位置,但只会在文章中贴出关键的代码,即便如此,本文的代码量依然不少,强烈建议收藏后阅读。
github上的项目是MyEchoServer,项目链接为:github.com/newland2024
项目的目录结构
MyEchoServer 项目的目录结构如下所示。
.
├── BenchMark
│ ├── benchmark.cpp
│ ├── client.hpp
│ ├── clientmanager.hpp
│ ├── makefile
│ ├── percentile.hpp
│ ├── stat.hpp
│ └── timer.hpp
├── common
│ ├── cmdline.cpp
│ ├── cmdline.h
│ ├── codec.hpp
│ ├── conn.hpp
│ ├── coroutine.cpp
│ ├── coroutine.h
│ ├── epollctl.hpp
│ ├── packet.hpp
│ └── utils.hpp
├── ConcurrencyModel
│ ├── Epoll
│ ├── EpollReactorProcessPoolCoroutine
│ ├── EpollReactorProcessPoolMS
│ ├── EpollReactorSingleProcess
│ ├── EpollReactorSingleProcessCoroutine
│ ├── EpollReactorSingleProcessET
│ ├── EpollReactorThreadPool
│ ├── EpollReactorThreadPoolHSHA
│ ├── EpollReactorThreadPoolMS
│ ├── LeaderAndFollower
│ ├── MultiProcess
│ ├── MultiThread
│ ├── Poll
│ ├── PollReactorSingleProcess
│ ├── ProcessPool1
│ ├── ProcessPool2
│ ├── Select
│ ├── SelectReactorSingleProcess
│ ├── SingleProcess
│ └── ThreadPool
├── readme.md
└── test
├── codectest.cpp
├── coroutinetest.cpp
├── makefile
├── packettest.cpp
├── unittestcore.hpp
└── unittestentry.cpp
相关的目录说明如下:
- BenchMark是基准性能压测工具的代码目录。
- ConcurrencyModel是20种不同并发模型的代码目录,这个目录下有 20 个不同的子目录,每个子目录都代表着一种并发模型的实现示例。
- common是公共代码的目录。
- test目录为单元测试代码的目录。
三、预备工作
因为I/O模型是并发模型涉及到的关键技术点,所以我们也不会免俗,也会介绍一下常见的I/O模型。
为了降低实现难度,这里我们实现了一个简单的应用层协议,并实现一些通用的基础代码,以便后续高效的实现不同的并发实例。
1. 常见I/O模型
常见的I/O模型有五种:阻塞I/O、非阻塞I/O、多路I/O复用、信号驱动I/O、异步I/O。其中的阻塞I/O、非阻塞I/O、多路I/O复用、信号驱动I/O都是同步IO。
同步I/O和异步I/O的区别在于,是否需要进程自己再调用I/O读写函数。同步I/O需要,异步I/O不需要。
(1) 阻塞I/O
在阻塞IO模式下,只要I/O暂不可用,读写操作就会被阻塞,直到I/O可用为止,在被阻塞期间,当前进程是被挂起的,这样就无法充分的使用CPU,导致并发效率低下。
(2) 非阻塞I/O
在非阻塞IO模式下,读写操作都是立即返回,此时当前进程并不会被挂起,这样就可以充分的使用CPU,非阻塞I/O通常会和多路I/O复用配合着一起使用,从而实现多个客户端请求的并发处理。
(3) 多路I/O复用
多路I/O复用实现了多个客户端连接的同时监听,大大提升了程序感知客户端连接可读写状态变化的效率。在Linux下多路I/O复用的系统调用为select、poll、epoll。
(4) 信号驱动I/O
通过注册SIGIO信号的处理函数,实现了一个I/O就绪的通知机制,在SIGIO信号的处理函数再进行读写操作,从而避免了低效的I/O是否就绪的轮询操作。
但是在信号处理函数中是不能调用异步信号不安全的函数,例如,有锁操作的函数就是异步信号不安全的,故信号驱动I/O应用的并不多。
(5) 异步I/O
前面的4种I/O模型都是同步IO,最后一种I/O模型是异步IO。异步I/O就是先向操作系统注册读写述求,然后就立马返回,进程不会被挂起。操作系统在完成读写操作之后,再调用进程之前注册读写述求时指定的回调函数,或者触发指定的信号。
2. 应用层协议
20种并发示例实现的是最常见的Echo(回显)服务,这里我们设计了一个简单的应用层协议,格式如下图所示。
协议由两部分组成,第一部分是固定长度(4字节)的协议头部,协议头部用于标识后面的变长协议体的长度,第二部分就是是具体的变长协议体。
(1) 协议实现
协议的编解码在common目录的codec.hpp文件中实现,其中DeCode函数用于实现协议的流式解析。
采用流式解析,能避免拒绝服务攻击。例如,攻击者创建大量的连接,然后每个连接上只发送一个字节的数据,如果采用常见的解析方式,一直在socket上读取数据,直到完成一个完整协议请求的解析。
在不采用协程的情况下,不管是阻塞IO、非阻塞IO、IO复用,当前的工作进程或者线程不是被挂起(阻塞IO),就是CPU使用率飙升(非阻塞IO),服务可用的工作进程或者线程会快速被消耗完,导致服务无法对正常的客户端提供服务,从而形成拒绝服务攻击。
流式解析(来多少字节,就解析多少字节)+ 协程切换(IO不可用时切换到其他协程)+ Reactor定时器实现非阻塞IO的超时机制,就可以很好的解决这种拒绝服务攻击。
(2) 共享的二进制缓冲区
这里特别说明一下二进制缓冲区的实现。在实现协议的时候,通常会「存储读取到的网络数据缓冲区」和「协议解析的缓冲区」这两份独立的缓冲区。
而我这里思考后,发现其实不用多申请一块二进制缓冲区,写入读取到的网络数据和解析读取到的网络数据可以共享同一个二进制缓冲区,进而减少了内存的分配和两块内存之间的拷贝。共享的二进制缓冲区的示意图如下图所示。
共享的二进制缓冲区在common目录的packet.hpp文件中实现。
3. 命令行参数解析
不管是BenchMark工具,还是不同的并发模型程序,都需要支持从命令行中读取动态参数的能力。因为参数解析的getopt系列函数并不易用,故参考Go语言的flag包实现,独立封装了一套易用的命令行参数解析函数。
具体的实现在common目录的cmdline.h和cmdline.cpp文件中。
4. 协程池实现
因为有协程池相关的并发模型,所以需要实现协程池。协程池的实现在common目录的coroutine.h和coroutine.cpp文件中。
特别提一下,协程池这里通过getcontext、makecontext、swapcontext这三个库函数来实现,并且通过C++11的模版函数和可变参数模板的特性,实现了支持变参列表的协程创建函数。协程创建函数的实现如下所示。
template
int CoroutineCreate(Schedule& schedule, Function&& f, Args&&... args) {
int id = 0;
for (id = 0; id state == Idle) break;
}
if (id >= schedule.coroutineCnt) {
return kInvalidRoutineId;
}
Coroutine* routine = schedule.coroutines[id];
std::function entry = std::bind(std::forward(f), std::forward(args)...);
CoroutineInit(schedule, routine, entry);
return id;
}
四、BenchMark 工具
正所谓工欲善其事,必先利其器。为了评估不同并发模型的性能,需要构建一个BenchMark工具来实现请求的发压。
1. 并发模型
在看压测工具实现之前,需要思考一个问题,「如果压测工具本身并发能力不足,则无法产生足够的流量负载,也就无法测试出不同并模型的性能极限」,所以也需要设计好压测工具使用的并发模型。
我们使用多线程+Reactor的并发模型来实现压测工具。发起请求的每个线程都是一个单独的Reactor模型。Reactor模型简图如下图所示。
Reactor是一种事件监听和分发模型,配合epoll可以实现高效的并发处理,从而能充分的利用CPU,即使是单线程也能产生足够大的请求负载。
2. 支持的特性
发压工具BenchMark的usage输出如下所示。
root@centos BenchMark $ ./BenchMark -h
BenchMark -ip 0.0.0.0 -port 1688 -thread_count 1 -max_req_count 100000 -pkt_size 1024 -client_count 200 -run_time 60 -rate_limit 10000 -debug
options:
-h,--help print usage
-ip,--ip service listen ip
-port,--port service listen port
-thread_count,--thread_count run thread count
-max_req_count,--max_req_count one connection max req count
-pkt_size,--pkt_size size of send packet, unit is byte
-client_count,--client_count count of client
-run_time,--run_time run time, unit is second
-rate_limit,--rate_limit rate limit, unit is qps/second
-debug,--debug debug mode, more info print
发压工具实现了以下的特性:
- 支持对监听在指定的ip和port的服务发起压测。
- 支持多线程压测,并可以指定使用的线程数。
- 支持指定客户端连接建立成功之后,最多可以发起多少次请求。(ps:这个选项值如果设置为1,则请求就退化成通过短连接来完成)
- 支持指定请求包的大小,单位为字节。
- 支持指定每个线程下发起请求的客户端并发连接数。
- 支持指定总的压测时间,单位秒。
- 支持指定压测能产生的最大的流量负载,单位qps。
- 支持debug模式。
4.3 代码实现
在单独的BenchMark目录中实现了这个压测工具,大家可以自行查看相关代码。这里特别说明一下,我使用了状态机来实现客户端请求的持续发送,代码会稍显复杂。
4. 执行效果
这里展示一下,压测其中一个单进程并发模型(EpollReactorSingleProcess)的效果,执行结果如下图所示。
从上图的压测结果可以看出,在请求数据长度为1k的情况下,EpollReactorSingleProcess的并发模型就能对外提供高达22万qps的并发处理能力。
上图中红色框出的内容就是最后的压测结果数据,这里的数据分为4部分:
- 接口的pct50、pct95、pct99和pct999的耗时数据。
- 请求成功数、请求失败数、尝试建立连接数、连接失败数、读失败数和写失败数。
- 客户端连接数和请求成功的qps数。
- 请求失败率和连接失败率。
五、20种不同的并发模型
在本节,我们将展示20种不同的并发模型的具体实现。
ConcurrencyModel目录下的每一个子目录都对应一种并发模型的实现。例如,SingleProcess子目录就是单进程并发模型的实现。
ConcurrencyModel目录下的每个子目录下都只有一个cpp文件和一个makefile文件,而这个cpp文件就是这种并发模型的主流程代码,而makefile文件是用于编译的。例如,SingleProcess子目录下只有一个singleprocess.cpp文件。
1. 简单模型
(1) SingleProcess
单进程的并发模型是最简单的,每次只为一个客户端连接服务,直到读写失败或者客户端关闭了连接。对应的代码如下所示。
#include
#include
#include
#include "../../common/cmdline.h"
#include "../../common/utils.hpp"
using namespace std;
using namespace MyEcho;
void handlerClient(int client_fd) {
string msg;
while (true) {
if (not RecvMsg(client_fd, msg)) {
return;
}
if (not SendMsg(client_fd, msg)) {
return;
}
}
}
void usage() {
cout