假设您没有在 Linux 上使用 C 运行时,而是针对其系统调用 API 进行编程。毕竟是长期稳定的。内存管理和缓冲 I/O很容易解决,但很多软件都受益于并发性。如果还具有线程生成功能,那就太好了。本文将演示一种简单、实用且强大的方法,仅使用原始系统调用来生成和管理线程。它只需要大约十几行 C 语言,包括一些内联汇编指令。
问题是没有办法避免使用一些汇编。clone
系统调用和系统调用都没有clone3
与 C 兼容的线程语义,因此您需要为每个体系结构使用一些内联汇编来掩盖它。本文将重点讨论 x86-64,但基本概念应该适用于 Linux 支持的所有体系结构。glibc 包装器在原始系统调用之上安装了一个与 C 兼容的接口,但我们不会在这里使用它。clone(2)
在开始之前,先看一下完整的、可运行的演示:stack_head.c
克隆系统调用
在 Linux 上,线程是使用clone
系统调用生成的,其语义类似于经典的 unix fork(2)
。一个进程进入,两个进程出来时状态几乎相同。对于线程来说,这些进程几乎共享所有内容,仅在两个寄存器上有所不同:返回值(新线程中为零)和堆栈指针。与典型的线程生成 API 不同,应用程序不提供入口点。它只为新线程提供一个堆栈。原始克隆 API 的简单形式如下所示:
long clone(long flags, void *stack);
听起来很优雅,但它有一个恼人的问题:新线程在没有任何已建立的堆栈框架的函数中间开始生命。它的堆栈是一张白纸。除了跳转到建立堆栈帧的函数序言之外,它还没有准备好执行任何操作。所以除了系统调用本身的汇编之外,还需要更多的汇编来让线程进入C兼容状态。换句话说,通用系统调用包装器无法可靠地生成线程。
void brokenclone(void (*threadentry)(void *), void *arg)
{
// ...
long r = syscall(SYS_clone, flags, stack);
// DANGER: new thread may access non-existant stack frame here
if (!r) {
threadentry(arg);
}
}
由于奇怪的历史原因,每个架构的clone
界面都略有不同。较新的版本clone3
统一了这些差异,但它遇到了上面相同的线程生成问题,因此在这里没有帮助。
堆栈“标头”
八年前我想出了一个巧妙的技巧,至今仍在使用。当新线程启动时,父线程和子线程处于几乎相同的状态,但直接目标是发散。如前所述,一个区别是它们的堆栈指针。为了分散它们的执行,我们可以使它们的执行依赖于堆栈。一个明显的选择是将不同的返回指针压入堆栈,然后让ret
指令完成工作。
提前仔细准备新堆栈是一切的关键,有一种简单的技术,我喜欢将其称为 ,这是stack_head
一种放置在新堆栈高端的结构。它的第一个元素必须是入口点指针,并且该入口点将接收一个指向其自身的指针stack_head
。
struct __attribute((aligned(16))) stack_head {
void (*entry)(struct stack_head *);
// ...
};
该结构在所有体系结构上都必须具有 16 字节对齐。sizeof
我使用了一个属性来帮助保持这一点,并且在使用它来放置结构时会有所帮助,正如我稍后将演示的那样。
现在最酷的部分是:...
可以是任何你想要的!使用该区域为新堆栈添加任何必要的线程本地数据。这是标准线程生成接口无法提供的一项巧妙功能。如果我打算稍后“加入”一个线程——等到它完成工作——我会在这个空间中放置一个加入 futex:
struct __attribute((aligned(16))) stack_head {
void (*entry)(struct stack_head *);
int join_futex;
// ...
};
稍后会提供有关该 futex 的更多详细信息。
克隆包装器
我称之为clone
包装器newthread
。它具有用于系统调用的内联汇编,并且由于它包含ret
用于发散线程的函数,因此它是一个“裸”函数,就像 with 一样setjmp
。编译器不会生成序言或尾声,并且函数体仅限于没有输入/输出操作数的内联汇编。它甚至无法通过名称可靠地引用其参数。与 一样clone
,它不接受线程入口点。相反,它接受stack_head
带有入口点的种子。整个包装器只有六个指令:
__attribute((naked))
static long newthread(struct stack_head *stack)
{
__asm volatile (
"mov %%rdi, %%rsi\n" // arg2 = stack
"mov $0x50f00, %%edi\n" // arg1 = clone flags
"mov $56, %%eax\n" // SYS_clone
"syscall\n"
"mov %%rsp, %%rdi\n" // entry point argument
"ret\n"
: : : "rax", "rcx", "rsi", "rdi", "r11", "memory"
);
}
在 x86-64 上,函数调用和系统调用都使用rdi
andrsi
作为前两个参数。根据上面的参考clone(2)
原型:第一个系统调用参数是flags
,第二个参数是 new stack
,它将直接指向stack_head
. 然而,堆栈指针到达rdi
。因此,我复制stack
到第二个参数寄存器 ,rsi
然后将标志 ( 0x50f00
) 加载到第一个参数寄存器rdi
。系统调用号码进入rax
。
这是从哪里来0x50f00
的?这是以十六进制设置的最低线程生成标志。如果缺少任何标志,则线程将无法可靠地生成 - 正如通过不同系统配置的反复试验而不是从文档中发现的那样。通常计算如下:
long flags = 0;
flags |= CLONE_FILES;
flags |= CLONE_FS;
flags |= CLONE_SIGHAND;
flags |= CLONE_SYSVSEM;
flags |= CLONE_THREAD;
flags |= CLONE_VM;
当系统调用返回时,它将堆栈指针复制到rdi
入口点的第一个参数中。stack
当然,在新线程中,堆栈指针的值将与 相同。在旧线程中,这是无害的无操作,因为rdi
在此 ABI 中是易失性寄存器。最后 ret
将栈顶地址弹出并跳转。在旧线程中,它会向调用者返回系统调用结果,或者是错误(负 errno),或者是新线程 ID。在新线程中, 它弹出第一个元素stack_head
,当然,它是入口点。这就是为什么它必须是第一!
线程无处可从入口点返回,因此当它完成时,它必须无限期地阻塞或使用exit
(not exit_group
)系统调用来终止自身。
来电者的观点
调用方看起来像这样:
static void threadentry(struct stack_head *stack)
{
// ... do work ...
__atomic_store_n(&stack->join_futex, 1, __ATOMIC_SEQ_CST);
futex_wake(&stack->join_futex);
exit(0);
}
__attribute((force_align_arg_pointer))
void _start(void)
{
struct stack_head *stack = newstack(1<<16);
stack->entry = threadentry;
// ... assign other thread data ...
stack->join_futex = 0;
newthread(stack);
// ... do work ...
futex_wait(&stack->join_futex, 0);
exit_group(0);
}
尽管有极简的 6 指令克隆包装器,但它采用的是传统线程 API 的形式。隐藏 futex 也只需要多一点时间。说到这里,到底是怎么回事呢?与 WaitGroup 的主体相同。futex 是一个整数,初始化为零,表示线程正在运行(“未完成”)。连接器告诉内核等待整数非零,它可能已经是非零了,因为我懒得先检查。当子线程完成时,它会自动将 futex 设置为非零并唤醒所有等待者,其中可能没有人。
警告:成功加入后释放/重用堆栈并不安全。它仅表示线程已完成其工作,而不表示它已退出。您需要等待它SIGCHLD
(或使用CLONE_CHILD_CLEARTID
)。如果这听起来像是一个问题,请更仔细地考虑您的上下文:为什么您觉得需要释放堆栈?当进程退出时它将被释放。担心堆栈泄漏吗?为什么要启动和退出无限数量的线程?在最坏的情况下,将线程停放在线程池中,直到您再次需要它。仅当您正在构建像 pthreads 这样的通用线程 API 时才需要担心此类事情。我知道这很诱人,但除非绝对必要,否则请避免这样做。
是怎么回事force_align_arg_pointer
?Linux 不会像 System V ABI 函数调用那样为进程入口点对齐堆栈。进程以未对齐的堆栈开始生命。该属性告诉 GCC 在入口点序言中修复堆栈对齐,就像在 Windows 上一样。如果你想访问argc
,,argv
你envp
将需要更多的组装。(我希望在 Linux 上不用 libc 做真正基本的事情不需要那么多汇编。)
__asm (
".global _start\n"
"_start:\n"
" movl (%rsp), %edi\n"
" lea 8(%rsp), %rsi\n"
" lea 8(%rsi,%rdi,8), %rdx\n"
" call main\n"
" movl %eax, %edi\n"
" movl $60, %eax\n"
" syscall\n"
);
int main(int argc, char **argv, char **envp)
{
// ...
}
回到示例用法,它有一些看起来很常规的系统调用包装器。这些从哪里来?从这个 6 参数通用系统调用包装器开始。
long syscall6(long n, long a, long b, long c, long d, long e, long f)
{
register long ret;
register long r10 asm("r10") = d;
register long r8 asm("r8") = e;
register long r9 asm("r9") = f;
__asm volatile (
"syscall"
: "=a"(ret)
: "a"(n), "D"(a), "S"(b), "d"(c), "r"(r10), "r"(r8), "r"(r9)
: "rcx", "r11", "memory"
);
return ret;
}
我可以定义syscall5
、syscall4
等,但我只是将其包装在宏中。前者会更有效,因为后者无缘无故地浪费指令将寄存器归零,但现在我专注于压缩实现源。
#define SYSCALL1(n, a) \
syscall6(n,(long)(a),0,0,0,0,0)
#define SYSCALL2(n, a, b) \
syscall6(n,(long)(a),(long)(b),0,0,0,0)
#define SYSCALL3(n, a, b, c) \
syscall6(n,(long)(a),(long)(b),(long)(c),0,0,0)
#define SYSCALL4(n, a, b, c, d) \
syscall6(n,(long)(a),(long)(b),(long)(c),(long)(d),0,0)
#define SYSCALL5(n, a, b, c, d, e) \
syscall6(n,(long)(a),(long)(b),(long)(c),(long)(d),(long)(e),0)
#define SYSCALL6(n, a, b, c, d, e, f) \
syscall6(n,(long)(a),(long)(b),(long)(c),(long)(d),(long)(e),(long)(f))
现在我们可以有一些出口:
__attribute((noreturn))
static void exit(int status)
{
SYSCALL1(SYS_exit, status);
__builtin_unreachable();
}
__attribute((noreturn))
static void exit_group(int status)
{
SYSCALL1(SYS_exit_group, status);
__builtin_unreachable();
}
简化的 futex 包装器:
static void futex_wait(int *futex, int expect)
{
SYSCALL4(SYS_futex, futex, FUTEX_WAIT, expect, 0);
}
static void futex_wake(int *futex)
{
SYSCALL3(SYS_futex, futex, FUTEX_WAKE, 0x7fffffff);
}
等等。
终于可以说说这个newstack
功能了。它只是一个匿名内存映射的包装,从内核分配页面。我已经对标准 mmap 分配的常量进行了硬编码,因为它们没有什么特殊或不寻常的。返回值检查有点棘手,因为负值范围的很大一部分是有效的,所以我只想检查小范围的负值 errnos。(分配竞技场看起来基本相同。)
static struct stack_head *newstack(long size)
{
unsigned long p = SYSCALL6(SYS_mmap, 0, size, 3, 0x22, -1, 0);
if (p > -4096UL) {
return 0;
}
long count = size / sizeof(struct stack_head);
return (struct stack_head *)p + count - 1;
}
该aligned
属性在这里发挥作用:我将结果视为数组stack_head
并返回最后一个元素。该属性确保每个单独的元素都是对齐的。
就是这样!除了一些深思熟虑的组装