Redis 模块和阻塞命令

如何在 Redis 模块中实现阻塞命令

Redis 在内置命令集中有一些阻塞命令。 最常用的之一是BLPOP(或对称的BRPOP) 哪些块 等待元素到达列表。

关于阻塞命令的有趣事实是它们不会阻塞 整个服务器,但只是调用它们的客户端。通常是 block 是我们预期会发生一些外部事件:这可以是 Redis 数据结构中的一些变化,如BLPOPcase、 长时间计算,以便从 network 等。

Redis 模块也能够实现阻塞命令, 本文档介绍了 API 的工作原理并介绍了一些模式 可用于对阻塞命令进行建模。

阻止和恢复的工作原理。

注意:您可能需要检查helloblock.cRedis 源代码树中的示例 在src/modules目录中,有关简单易懂的示例 关于如何应用阻止 API。

在 Redis 模块中,命令由回调函数实现,这些回调函数 在调用特定命令时由 Redis 核心调用 由用户。通常,回调会终止其执行发送 有些人回复客户。使用以下函数, 函数中实现 module 命令时,可能会请求客户端 被置于 blocked 状态:

RedisModuleBlockedClient *RedisModule_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);

该函数返回一个RedisModuleBlockedClientobject,即稍后的 用于取消阻止客户端。参数如下 意义:

  • ctx是命令执行上下文,通常在 API 的其余部分中。
  • reply_callback是回调,具有与普通命令函数相同的原型,在客户端被解除阻塞时调用,以便向客户端返回回复。
  • timeout_callback是回调,具有与客户端到达ms超时。
  • free_privdata是为释放私有数据而调用的回调。私有数据是一个指针,指向在用于取消阻止客户端的 API 之间传递的一些数据,该数据指向将回复发送到客户端的回调。我们将在本文档后面看到这种机制是如何工作的。
  • ms是超时(以毫秒为单位)。当达到超时时,将调用超时回调并自动中止客户端。

客户端被阻止后,可以使用以下 API 取消阻止它:

int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);

该函数将 对RedisModule_BlockClient(),然后取消阻止客户端。 在客户端被取消阻塞之前,reply_callback功能 指定 Client 端被阻塞时调用:此函数将 可以访问privdata指针。

重要提示:上述函数是线程安全的,可以从内部调用 一个线程正在执行一些工作,以实现阻止 客户端。

privdata数据将使用free_privdataClient 端解封时的 callback。这很有用,因为回复 callback 在客户端超时或断开连接时,可能永远不会调用 从服务器,因此由外部函数决定是很重要的 负责释放传递的数据(如果需要)。

为了更好地理解 API 的工作原理,我们可以想象编写一个命令 这会阻止客户端一秒钟,然后发送回复 “Hello!”。

注意:arity 检查和其他不重要的事情没有实现 int 命令,以 simple 为例。

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    pthread_create(&tid,NULL,threadmain,bc);

    return REDISMODULE_OK;
}

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */
    RedisModule_UnblockClient(bc,NULL);
}

上述命令会尽快阻止客户端,从而生成一个线程,该线程将 请稍等片刻,然后取消阻止客户端。让我们检查一下回复和 timeout 回调,在我们的例子中非常相似,因为它们 只需使用不同的回复类型回复客户端即可。

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithSimpleString(ctx,"Hello!");
}

int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithNull(ctx);
}

回复回调只是将 “Hello!” 字符串发送到客户端。 这里重要的一点是,当 客户端已从线程中取消阻塞。

timeout 命令返回NULL,就像实际 Redis 阻止命令超时。

取消阻止时传递回复数据

上面的例子简单易懂,但缺少一个重要的 实际阻塞命令实现的真实方面:通常 reply 函数需要知道要回复客户端什么, 此信息通常在客户端解除阻止时提供。

我们可以修改上面的示例,以便线程生成一个 等待 1 秒后的 random number。你可以把它看作是一个 实际上是某种扩张性的行动。然后这个随机数 可以传递给 reply 函数,以便我们将其返回给命令 访客。为了使其正常工作,我们按如下方式修改函数:

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */

    long *mynumber = RedisModule_Alloc(sizeof(long));
    *mynumber = rand();
    RedisModule_UnblockClient(bc,mynumber);
}

如您所见,现在 unblocking 调用正在传递一些私有数据 那就是mynumber指针,指向 Reply 回调。为了 获取此私有数据,Reply 回调将使用以下 功能:

void *RedisModule_GetBlockedClientPrivateData(RedisModuleCtx *ctx);

所以我们的回复回调是这样修改的:

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
    /* IMPORTANT: don't free mynumber here, but in the
     * free privdata callback. */
    return RedisModule_ReplyWithLongLong(ctx,mynumber);
}

请注意,我们还需要传递一个free_privdata阻塞时的功能 客户端RedisModule_BlockClient(),由于分配的 long 值。我们的回调将如下所示:

void free_privdata(void *privdata) {
    RedisModule_Free(privdata);
}

注意:需要强调的是,私有数据最好在free_privdatacallback,因为 reply 函数可能无法被调用 如果客户端断开连接或超时。

另请注意,私有数据也可以从超时中访问 callback 中,始终使用GetBlockedClientPrivateData()应用程序接口。

中止客户端的阻塞

有时出现的一个问题是我们需要分配资源 为了实现 non blocking 命令。所以我们阻止了客户端, 然后,例如,尝试创建一个线程,但线程创建函数 返回错误。在这种情况下该怎么办才能恢复?我们 不想拿 client 被阻止,我们也不想调用UnblockClient()因为这会触发 reply 回调被调用。

在这种情况下,最好的办法是使用以下函数:

int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);

实际上,这是如何使用它:

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}

客户端将被解封,但不会调用 reply 回调。

使用单个函数实现 command、reply 和 timeout 回调

可以使用以下函数来实现回复和 callback 替换为实现 primary 命令的相同函数 功能:

int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);

因此,我可以重写示例命令,而无需使用单独的 回复和超时回调:

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    if (RedisModule_IsBlockedReplyRequest(ctx)) {
        long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
        return RedisModule_ReplyWithLongLong(ctx,mynumber);
    } else if (RedisModule_IsBlockedTimeoutRequest) {
        return RedisModule_ReplyWithNull(ctx);
    }

    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}

功能是相同的,但有些人会喜欢较少的 详细实现,它将大部分命令逻辑集中在 单一功能。

在线程中处理数据副本

一个有趣的模式,以便与实现 slow 部分,则使用数据的副本,以便 在键中执行某些作时,用户继续看到 旧版本。但是,当线程终止其工作时, 交换表示,并使用新的已处理版本。

这种方法的一个例子是 Neural Redis 模块,其中神经网络在不同的线程中训练,而 用户仍然可以执行和检查其旧版本。

未来的工作

目前正在开发一个 API,以允许 Redis 模块 API 以安全的方式从线程中调用,以便 THREADED 命令 可以访问数据空间并执行增量作。

此功能没有 ETA,但它可能会出现在 Redis 4.0 版本即将发布。