APCu高速共享缓存插件分享,性能超越Redis达10倍!

2024年 5月 29日 52.9k 0

前言

今年接触了一个策略类手游相关的项目,后端本身计划是使用skynet进行开发的,后来结合项目的时间紧急程度和客户端开发组讨论后决定使用PHP进行快速开发,后期再使用其他语言框架进行拆分业务;综合考虑最后选用了webman作为主要开发框架。

整体项目分为配置服务、HTTP-API服务、websocket服务三大部分,其中配置管理主要是兼容客户端生成的配置数据进行导入导出转换加载,底层使用MySQL进行储存,多服务间使用Redis进行一级缓存,服务进程间使用了基于APCu的共享缓存,后期我将该共享缓存组件化也贡献给了社区。

【workbunny】共享高速缓存 https://www.workerman.net/plugin/133

Redis

在游戏开发界实际上使用Redis的情况还是比较多的,我们使用Redis主要还是为了将一些数据缓存共享给各个服务器实例:

┌─────┐                                       ┌─────┐
     |  A  | ────────────>  service    instance   process   microtime(true),
            'method'    => $func,
            'params'    => $params,
            'result'    => null
        ];
    }, true);
    return $result;
}

比如上述代码,就是一个Hash key的自增操作,我们需要在读取Hash后在写入,读取和写入应为一体的;

原子性执行函数Atomic的实现如下:

/**
     * 原子操作
     *  - 无法对锁本身进行原子性操作
     *  - 只保证handler是否被原子性触发,对其逻辑是否抛出异常不负责
     *  - handler尽可能避免超长阻塞
     *  - lockKey会被自动设置特殊前缀#lock#,可以通过Cache::LockInfo进行查询
     *
     * @param string $lockKey
     * @param Closure $handler
     * @param bool $blocking
     * @return bool
     */
    protected static function _Atomic(string $lockKey, Closure $handler, bool $blocking = false): bool
    {
        $func = __FUNCTION__;
        $result = false;
        if ($blocking) {
            $startTime = time();
            while ($blocking) {
                // 阻塞保险
                if (time() >= $startTime + self::$fuse) {return false;}
                // 创建锁
                apcu_entry($lock = self::GetLockKey($lockKey), function () use (
                    $lockKey, $handler, $func, &$result, &$blocking
                ) {
                    $res = call_user_func($handler);
                    $result = true;
                    $blocking = false;
                    return [
                        'timestamp' => microtime(true),
                        'method'    => $func,
                        'params'    => [$lockKey, '\Closure'],
                        'result'    => $res
                    ];
                });
            }
        } else {
            // 创建锁
            apcu_entry($lock = self::GetLockKey($lockKey), function () use (
                $lockKey, $handler, $func, &$result
            ) {
                $res = call_user_func($handler);
                $result = true;
                return [
                    'timestamp' => microtime(true),
                    'method'    => $func,
                    'params'    => [$lockKey, '\Closure'],
                    'result'    => $res
                ];
            });
        }
        if ($result) {
            apcu_delete($lock);
        }
        return $result;
    }

当使用阻塞模式的时候,我们会在当前进程内使用一个while循环来进行阻塞抢占,为了不将当前进程阻塞死,我们还加入了一个保险,由self::$fuse提供;

注意

这里在实践过程中需要注意的是,Atomic在传入回调函数时切勿再使用匿名函数作为参数值或者是通过use传入一个匿名函数,如:

$fuc = function() {
    // do something
}
Cache::Atomic('test', function () use ($fuc) {
    // do anything
})

APCu底层会对函数参数值或引用参数进行序列化储存,但匿名函数不可以被序列化,所以会抛出一个异常;但你可以通过当前对象的属性值或者静态属性来保存一个匿名函数,然后在Atomic的回调内调用使用。

0.4.x版本

由于目前我使用Webman基于SQLite和共享内存在自行实现一个具备RAFT的轻调度服务插件和服务注册与发现插件,所以特此为其完善增加了Channel特性;

Channel可以辅助实现类似Redis-List、Redis-stream、Redis-Pub/Sub的功能。

Channel

Channel是个特殊的数据格式,他的格式是固定如下的:

[
    '--default--' => [
        'futureId' => null,
        'value'    => []
    ],
    workerId_1 => [
        'futureId' => 1,
        'value'    => []
    ],
    workerId_2 => [
        'futureId' => 1,
        'value'    => []
    ],
    ......
]

它在共享内存中的键默认以**#Channel#**开头。

  • --default--是默认储存空间,workerId_1/workerId_2 等是子通道储存空间,命名是由用户代码传入的,这里建议使用workerman自带的workerId即可。
  • 默认储存空间和子通道储存空间是互斥的,也就是说当存在子通道储存空间时,是不存在--default--的,反之亦然;子通道储存空间是当当前通道存在监听器时生成的,而在监听器产生前,消息会暂存在--default--空间,当监听器创建时,--default--的数据value会被同步到子通道储存空间内,加入value的队头。
  • 每一个子通道储存空间的value都是拷贝的,存在相同的数据,各自监听器监听各自的子通道储存空间;消息的发布支持向所有子通道发布,也可以指定子通道进行发布。
  • 监听器的底层使用了workerman的定时器,区别与workerman的timer,在event驱动下定时器的间隔是0,也就是一个future,而其他的事件驱动是0.001s为间隔。

实现一个List

由于监听器创建消费是基于workerId的,我们可以通过不同进程创建相同的workerId的监听器来对同一个子通道进行监听:

  • A进程使用list作为workerId:
  • Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  • B进程也同样创建list的workerId监听器:
  • Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  • 此时Channel test的数据如下:
  • [
    'list' => [
        'futureId' => 1,
        'value'    => []
    ],
    ......
    ]

    注意:共享内存中储存的futureId为最后一个监听器创建的futureId;当当前进程需要对监听器进行移除时,请勿使用该数据,对应进程内可以通过Cache::ChCreateListener()的返回值获取到当前进程创建的futureId用于移除监听器,不使用共享内存中储存的futureId即可

  • 这时任意进程通过Cache::ChPublish('test', '这是一个测试消息', true);发送消息,或者指定workerIdCache::ChPublish('test', '这是一个测试消息', true, 'list');。
  • 实现一个Pub/Sub

  • A进程使用workerman的workerId作为workerId:
  • Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  • B进程使用workerman的workerId作为workerId:
  • Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  • 此时Channel test的数据可能如下:
  • [
    1 => [
        'futureId' => 1,
        'value'    => []
    ],
    2 => [
        'futureId' => 1,
        'value'    => []
    ]
    ]
  • 这时,任意进程通过Cache::ChPublish('test', '这是一个测试消息', false);发送消息即可。
  • 注:发送消息第三个参数使用false时,如发送时还未创建监听器,消息则不会储存至Channel,即监听后才可存在消息

    实现类似Redis-stream

    与Pub/Sub相同,只不过发布消息使用Cache::ChPublish('test', '这是一个测试消息', true);, 当发布消息指定workerId时,可以实现类似Redis-Stream Group的功能。

    注:这里更复杂的功能可能需要对workerId进行变通,不能简单使用workerman自带的workerId,只需要自行规划好即可

    相关文章

    Oracle如何使用授予和撤销权限的语法和示例
    Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
    下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
    社区版oceanbase安装
    Oracle 导出CSV工具-sqluldr2
    ETL数据集成丨快速将MySQL数据迁移至Doris数据库

    发布评论