Redis 模块和阻塞命令

如何在 Redis 模块中实现阻塞命令

Redis 在内置命令集中有一些阻塞命令。 最常用的之一是BLPOP(或对称的BRPOP) 哪些块 等待元素到达列表。

关于阻塞命令的有趣事实是它们不会阻塞 整个服务器,但只是调用它们的客户端。通常是 block 是我们预期会发生一些外部事件:这可以是 Redis 数据结构中的一些变化,如BLPOPcase、 长时间计算,以便从 network 等。

Redis 模块也能够实现阻塞命令, 本文档介绍了 API 的工作原理并介绍了一些模式 可用于对阻塞命令进行建模。

阻止和恢复的工作原理。

注意:您可能需要检查helloblock.cRedis 源代码树中的示例 在src/modules目录中,有关简单易懂的示例 关于如何应用阻止 API。

在 Redis 模块中,命令由回调函数实现,这些回调函数 在调用特定命令时由 Redis 核心调用 由用户。通常,回调会终止其执行发送 有些人回复客户。使用以下函数, 函数中实现 module 命令时,可能会请求客户端 被置于 blocked 状态:

RedisModuleBlockedClient *RedisModule_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);

该函数返回一个RedisModuleBlockedClientobject,即稍后的 用于取消阻止客户端。参数如下 意义:

  • ctx是命令执行上下文,通常在 API 的其余部分中。
  • reply_callback是回调,具有与普通命令函数相同的原型,在客户端被解除阻塞时调用,以便向客户端返回回复。
  • timeout_callback是回调,具有与客户端到达ms超时。
  • free_privdata是为释放私有数据而调用的回调。私有数据是一个指针,指向在用于取消阻止客户端的 API 之间传递的一些数据,该数据指向将回复发送到客户端的回调。我们将在本文档后面看到这种机制是如何工作的。
  • ms是超时(以毫秒为单位)。当达到超时时,将调用超时回调并自动中止客户端。

客户端被阻止后,可以使用以下 API 取消阻止它:

int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);

该函数将 对RedisModule_BlockClient(),然后取消阻止客户端。 在客户端被取消阻塞之前,reply_callback功能 指定 Client 端被阻塞时调用:此函数将 可以访问privdata指针。

重要提示:上述函数是线程安全的,可以从内部调用 一个线程正在执行一些工作,以实现阻止 客户端。

privdata数据将使用free_privdataClient 端解封时的 callback。这很有用,因为回复 callback 在客户端超时或断开连接时,可能永远不会调用 从服务器,因此由外部函数决定是很重要的 负责释放传递的数据(如果需要)。

为了更好地理解 API 的工作原理,我们可以想象编写一个命令 这会阻止客户端一秒钟,然后发送回复 “Hello!”。

注意:arity 检查和其他不重要的事情没有实现 int 命令,以 simple 为例。

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    pthread_create(&tid,NULL,threadmain,bc);

    return REDISMODULE_OK;
}

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */
    RedisModule_UnblockClient(bc,NULL);
}

上述命令会尽快阻止客户端,从而生成一个线程,该线程将 请稍等片刻,然后取消阻止客户端。让我们检查一下回复和 timeout 回调,在我们的例子中非常相似,因为它们 只需使用不同的回复类型回复客户端即可。

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithSimpleString(ctx,"Hello!");
}

int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithNull(ctx);
}

回复回调只是将 “Hello!” 字符串发送到客户端。 这里重要的一点是,当 客户端已从线程中取消阻塞。

timeout 命令返回NULL,就像实际 Redis 阻止命令超时。

取消阻止时传递回复数据

上面的例子简单易懂,但缺少一个重要的 实际阻塞命令实现的真实方面:通常 reply 函数需要知道要回复客户端什么, 此信息通常在客户端解除阻止时提供。

我们可以修改上面的示例,以便线程生成一个 等待 1 秒后的 random number。你可以把它看作是一个 实际上是某种扩张性的行动。然后这个随机数 可以传递给 reply 函数,以便我们将其返回给命令 访客。为了使其正常工作,我们按如下方式修改函数:

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */

    long *mynumber = RedisModule_Alloc(sizeof(long));
    *mynumber = rand();
    RedisModule_UnblockClient(bc,mynumber);
}

如您所见,现在 unblocking 调用正在传递一些私有数据 那就是mynumber指针,指向 Reply 回调。为了 获取此私有数据,Reply 回调将使用以下 功能:

void *RedisModule_GetBlockedClientPrivateData(RedisModuleCtx *ctx);

所以我们的回复回调是这样修改的:

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
    /* IMPORTANT: don't free mynumber here, but in the
     * free privdata callback. */
    return RedisModule_ReplyWithLongLong(ctx,mynumber);
}

请注意,我们还需要传递一个free_privdata阻塞时的功能 客户端RedisModule_BlockClient(),由于分配的 long 值。我们的回调将如下所示:

void free_privdata(void *privdata) {
    RedisModule_Free(privdata);
}

注意:需要强调的是,私有数据最好在free_privdatacallback,因为 reply 函数可能无法被调用 如果客户端断开连接或超时。

另请注意,私有数据也可以从超时中访问 callback 中,始终使用GetBlockedClientPrivateData()应用程序接口。

中止客户端的阻塞

有时出现的一个问题是我们需要分配资源 为了实现 non blocking 命令。所以我们阻止了客户端, 然后,例如,尝试创建一个线程,但线程创建函数 返回错误。在这种情况下该怎么办才能恢复?我们 不想拿 client 被阻止,我们也不想调用UnblockClient()因为这会触发 reply 回调被调用。

在这种情况下,最好的办法是使用以下函数:

int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);

实际上,这是如何使用它:

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}

客户端将被解封,但不会调用 reply 回调。

使用单个函数实现 command、reply 和 timeout 回调

可以使用以下函数来实现回复和 callback 替换为实现 primary 命令的相同函数 功能:

int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);

因此,我可以重写示例命令,而无需使用单独的 回复和超时回调:

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    if (RedisModule_IsBlockedReplyRequest(ctx)) {
        long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
        return RedisModule_ReplyWithLongLong(ctx,mynumber);
    } else if (RedisModule_IsBlockedTimeoutRequest) {
        return RedisModule_ReplyWithNull(ctx);
    }

    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}

功能是相同的,但有些人会喜欢较少的 详细实现,它将大部分命令逻辑集中在 单一功能。

在线程中处理数据副本

一个有趣的模式,以便与实现 slow 部分,则使用数据的副本,以便 在键中执行某些作时,用户继续看到 旧版本。但是,当线程终止其工作时, 交换表示,并使用新的已处理版本。

这种方法的一个例子是 Neural Redis 模块,其中神经网络在不同的线程中训练,而 用户仍然可以执行和检查其旧版本。

未来的工作

目前正在开发一个 API,以允许 Redis 模块 API 以安全的方式从线程中调用,以便 THREADED 命令 可以访问数据空间并执行增量作。

此功能没有 ETA,但它可能会出现在 Redis 4.0 版本即将发布。

为本页评分
返回顶部 ↑