XREAD组
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
- 从以下位置开始可用:
- 5.0.0
- 时间复杂度:
- 对于提到的每个流:O(M),其中 M 是返回的元素数。如果 M 是常数(例如,总是用 COUNT 请求前 10 个元素),你可以认为它是 O(1)。另一方面,当 XREADGROUP 阻塞时,XADD 将支付 O(N) 时间,以便为流上阻塞的 N 个客户端提供新数据。
- ACL 类别:
-
@write
,@stream
,@slow
,@blocking
,
这XREADGROUP
command 是XREAD
命令
支持使用者组。可能您必须了解XREAD
命令才有意义。
此外,如果您不熟悉流,我们建议您阅读我们的 Redis Streams 简介。 请务必了解引言中 Consumer Group 的概念 这样遵循这个命令的工作原理会更简单。
30 秒消费组
这个命令和原版XREAD
是这个
一个支持使用者组。
没有消费组,只需使用XREAD
,则所有客户端都将与流中到达的所有条目一起提供。而是使用 Consumer groups 和XREADGROUP
,可以创建客户端组,这些客户端使用到达给定流的消息的不同部分。例如,如果流获取新条目 A、B 和 C,并且有两个使用者通过使用者组读取数据,则一个客户端将获取消息 A 和 C,另一个客户端将获取消息 B,依此类推。
在使用者组中,给定的使用者(即,仅使用流中消息的客户端)必须使用唯一的使用者名称进行标识。这只是一个字符串。
使用者组的保证之一是,给定使用者只能查看已传送给它的消息的历史记录,因此一条消息只有一个所有者。但是,有一个称为消息声明的特殊功能,它允许其他使用者在某个使用者出现不可恢复的故障时声明消息。为了实现此类语义,消费者组需要通过XACK
命令。这是必需的,因为流将跟踪每个使用者组谁正在处理什么消息。
以下是了解是否要使用 Consumer Group 的方法:
- 如果您有一个流和多个客户端,并且希望所有客户端都能获取所有消息,则不需要使用者组。
- 如果您有一个流和多个客户端,并且希望在客户端之间对流进行分区或分片,以便每个客户端都能获得到达流中的消息的子集,则需要一个使用者组。
XREAD 和 XREADGROUP 之间的区别
从语法的角度来看,命令几乎相同,
然而XREADGROUP
需要一个特殊的强制性选项:
GROUP <group-name> <consumer-name>
组名称只是与流关联的使用者组的名称。
该组是使用XGROUP
命令。使用者名称是
客户端用于在组内标识自身的字符串。
使用者首次在使用者组内自动创建
被看到。不同的客户端应选择不同的使用者名称。
当您使用XREADGROUP
,服务器会记住给定的
message was delivered to you (消息已发送给您):消息将存储在
使用者组,即 Pending Entries List (PEL) 中的
已送达但尚未确认的消息 ID 列表。
客户端必须使用XACK
以便从 PEL 中删除待处理条目。The PEL
可以使用XPENDING
命令。
这NOACK
子命令可用于避免将消息添加到
不需要可靠性且偶尔会丢失消息的情况
是可以接受的。这相当于在读取消息时确认消息。
使用XREADGROUP
能
成为以下两项之一:
- 特殊 ID,这意味着使用者只想接收从未传送给任何其他使用者的消息。它只是意味着,给我新消息。
>
- 任何其他 ID,即 0 或任何其他有效 ID 或不完整的 ID(只是毫秒时间部分),将具有返回等待使用者发送 ID 大于所提供 ID 的命令的条目的效果。所以基本上如果 ID 不是,那么该命令将只允许客户端访问其待处理的条目:已送达但尚未确认的消息。请注意,在这种情况下,两者
>
BLOCK
和NOACK
被忽略。
喜欢XREAD
这XREADGROUP
command 可以以阻塞方式使用。那里
在这方面没有区别。
将消息传送给使用者时会发生什么情况?
两件事:
- 如果该消息从未发送给任何人,也就是说,如果我们谈论的是新消息,则会创建一个 PEL(待处理条目列表)。
- 相反,如果消息已传送给此使用者,并且它只是再次重新获取同一消息,则最后一个传送计数器将更新为当前时间,并且传送次数将递增 1。您可以使用
XPENDING
命令。
使用示例
通常,您使用这样的命令来获取新消息和 处理它们。在伪代码中:
WHILE true
entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
if entries == nil
puts "Timeout... try again"
CONTINUE
end
FOREACH entries AS stream_entries
FOREACH stream_entries as message
process_message(message.id,message.fields)
# ACK the message as processed
XACK mystream $GroupName message.id
END
END
END
这样,示例消费者代码将仅获取新消息,处理
他们,并通过以下方式确认他们XACK
.但是,上面的示例代码是
未完成,因为它不处理崩溃后的恢复。什么
如果我们在处理消息的过程中崩溃,则会发生,这是因为我们的
消息将保留在 Pending entries 列表中,因此我们可以访问我们的
历史XREADGROUP
最初为 ID 为 0,并执行相同的
圈。一旦提供 ID 为 0,回复就是一组空的消息,我们
知道我们处理并确认了所有待处理的消息:我们
可以开始使用 as ID,以便获取新消息并重新加入
正在处理新事物的消费者。>
要查看命令的实际回复方式,请检查XREAD
命令页面。
删除待处理消息时会发生什么情况?
由于修剪或显式调用XDEL
随时。
根据设计,Redis 不会阻止删除流的 PEL 中存在的条目。
发生这种情况时,PEL 会保留已删除条目的 ID,但实际的条目有效负载不再可用。
因此,在读取此类 PEL 条目时,Redis 将返回一个 null 值来代替它们各自的数据。
例:
> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1-0"
2) 1) "myfield"
2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1-0"
2) (nil)
阅读 Redis Streams 简介 建议,以便更多地了解流的整体行为 和语义。
RESP2 回复
以下选项之一:
- Array reply:一个数组,其中每个元素都是一个数组,由两个元素组成,其中包含键名称和为该键报告的条目。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按添加顺序报告
XADD
. - Nil 回复:如果给出了 BLOCK 选项并且发生超时,或者没有可以提供的流。
RESP3 回复
以下选项之一:
- Map reply:键值元素的映射,其中每个元素都由键名称和为该键报告的条目组成。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按添加顺序报告
XADD
. - 空回复:如果给出了 BLOCK 选项并且发生超时,或者没有可以提供的流。