Redis 模块和阻塞命令
如何在 Redis 模块中实现阻塞命令
Redis 在内置命令集中有一些阻塞命令。
最常用的之一是BLPOP
(或对称的BRPOP
) 哪些块
等待元素到达列表。
关于阻塞命令的有趣事实是它们不会阻塞
整个服务器,但只是调用它们的客户端。通常是
block 是我们预期会发生一些外部事件:这可以是
Redis 数据结构中的一些变化,如BLPOP
case、
长时间计算,以便从
network 等。
Redis 模块也能够实现阻塞命令, 本文档介绍了 API 的工作原理并介绍了一些模式 可用于对阻塞命令进行建模。
阻止和恢复的工作原理。
注意:您可能需要检查helloblock.c
Redis 源代码树中的示例
在src/modules
目录中,有关简单易懂的示例
关于如何应用阻止 API。
在 Redis 模块中,命令由回调函数实现,这些回调函数 在调用特定命令时由 Redis 核心调用 由用户。通常,回调会终止其执行发送 有些人回复客户。使用以下函数, 函数中实现 module 命令时,可能会请求客户端 被置于 blocked 状态:
RedisModuleBlockedClient *RedisModule_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);
该函数返回一个RedisModuleBlockedClient
object,即稍后的
用于取消阻止客户端。参数如下
意义:
ctx
是命令执行上下文,通常在 API 的其余部分中。reply_callback
是回调,具有与普通命令函数相同的原型,在客户端被解除阻塞时调用,以便向客户端返回回复。timeout_callback
是回调,具有与客户端到达ms
超时。free_privdata
是为释放私有数据而调用的回调。私有数据是一个指针,指向在用于取消阻止客户端的 API 之间传递的一些数据,该数据指向将回复发送到客户端的回调。我们将在本文档后面看到这种机制是如何工作的。ms
是超时(以毫秒为单位)。当达到超时时,将调用超时回调并自动中止客户端。
客户端被阻止后,可以使用以下 API 取消阻止它:
int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);
该函数将
对RedisModule_BlockClient()
,然后取消阻止客户端。
在客户端被取消阻塞之前,reply_callback
功能
指定 Client 端被阻塞时调用:此函数将
可以访问privdata
指针。
重要提示:上述函数是线程安全的,可以从内部调用 一个线程正在执行一些工作,以实现阻止 客户端。
这privdata
数据将使用free_privdata
Client 端解封时的 callback。这很有用,因为回复
callback 在客户端超时或断开连接时,可能永远不会调用
从服务器,因此由外部函数决定是很重要的
负责释放传递的数据(如果需要)。
为了更好地理解 API 的工作原理,我们可以想象编写一个命令 这会阻止客户端一秒钟,然后发送回复 “Hello!”。
注意:arity 检查和其他不重要的事情没有实现 int 命令,以 simple 为例。
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
pthread_t tid;
pthread_create(&tid,NULL,threadmain,bc);
return REDISMODULE_OK;
}
void *threadmain(void *arg) {
RedisModuleBlockedClient *bc = arg;
sleep(1); /* Wait one second and unblock. */
RedisModule_UnblockClient(bc,NULL);
}
上述命令会尽快阻止客户端,从而生成一个线程,该线程将 请稍等片刻,然后取消阻止客户端。让我们检查一下回复和 timeout 回调,在我们的例子中非常相似,因为它们 只需使用不同的回复类型回复客户端即可。
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
return RedisModule_ReplyWithSimpleString(ctx,"Hello!");
}
int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
return RedisModule_ReplyWithNull(ctx);
}
回复回调只是将 “Hello!” 字符串发送到客户端。 这里重要的一点是,当 客户端已从线程中取消阻塞。
timeout 命令返回NULL
,就像实际
Redis 阻止命令超时。
取消阻止时传递回复数据
上面的例子简单易懂,但缺少一个重要的 实际阻塞命令实现的真实方面:通常 reply 函数需要知道要回复客户端什么, 此信息通常在客户端解除阻止时提供。
我们可以修改上面的示例,以便线程生成一个 等待 1 秒后的 random number。你可以把它看作是一个 实际上是某种扩张性的行动。然后这个随机数 可以传递给 reply 函数,以便我们将其返回给命令 访客。为了使其正常工作,我们按如下方式修改函数:
void *threadmain(void *arg) {
RedisModuleBlockedClient *bc = arg;
sleep(1); /* Wait one second and unblock. */
long *mynumber = RedisModule_Alloc(sizeof(long));
*mynumber = rand();
RedisModule_UnblockClient(bc,mynumber);
}
如您所见,现在 unblocking 调用正在传递一些私有数据
那就是mynumber
指针,指向 Reply 回调。为了
获取此私有数据,Reply 回调将使用以下
功能:
void *RedisModule_GetBlockedClientPrivateData(RedisModuleCtx *ctx);
所以我们的回复回调是这样修改的:
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
/* IMPORTANT: don't free mynumber here, but in the
* free privdata callback. */
return RedisModule_ReplyWithLongLong(ctx,mynumber);
}
请注意,我们还需要传递一个free_privdata
阻塞时的功能
客户端RedisModule_BlockClient()
,由于分配的
long 值。我们的回调将如下所示:
void free_privdata(void *privdata) {
RedisModule_Free(privdata);
}
注意:需要强调的是,私有数据最好在free_privdata
callback,因为 reply 函数可能无法被调用
如果客户端断开连接或超时。
另请注意,私有数据也可以从超时中访问
callback 中,始终使用GetBlockedClientPrivateData()
应用程序接口。
中止客户端的阻塞
有时出现的一个问题是我们需要分配资源
为了实现 non blocking 命令。所以我们阻止了客户端,
然后,例如,尝试创建一个线程,但线程创建函数
返回错误。在这种情况下该怎么办才能恢复?我们
不想拿 client 被阻止,我们也不想调用UnblockClient()
因为这会触发 reply 回调被调用。
在这种情况下,最好的办法是使用以下函数:
int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);
实际上,这是如何使用它:
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
pthread_t tid;
if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
RedisModule_AbortBlock(bc);
RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
}
return REDISMODULE_OK;
}
客户端将被解封,但不会调用 reply 回调。
使用单个函数实现 command、reply 和 timeout 回调
可以使用以下函数来实现回复和 callback 替换为实现 primary 命令的相同函数 功能:
int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);
因此,我可以重写示例命令,而无需使用单独的 回复和超时回调:
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
if (RedisModule_IsBlockedReplyRequest(ctx)) {
long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithLongLong(ctx,mynumber);
} else if (RedisModule_IsBlockedTimeoutRequest) {
return RedisModule_ReplyWithNull(ctx);
}
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
pthread_t tid;
if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
RedisModule_AbortBlock(bc);
RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
}
return REDISMODULE_OK;
}
功能是相同的,但有些人会喜欢较少的 详细实现,它将大部分命令逻辑集中在 单一功能。
在线程中处理数据副本
一个有趣的模式,以便与实现 slow 部分,则使用数据的副本,以便 在键中执行某些作时,用户继续看到 旧版本。但是,当线程终止其工作时, 交换表示,并使用新的已处理版本。
这种方法的一个例子是 Neural Redis 模块,其中神经网络在不同的线程中训练,而 用户仍然可以执行和检查其旧版本。
未来的工作
目前正在开发一个 API,以允许 Redis 模块 API 以安全的方式从线程中调用,以便 THREADED 命令 可以访问数据空间并执行增量作。
此功能没有 ETA,但它可能会出现在 Redis 4.0 版本即将发布。