XREAD组

语法
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
  [NOACK] STREAMS key [key ...] id [id ...]
从以下位置开始可用:
5.0.0
时间复杂度:
对于提到的每个流:O(M),其中 M 是返回的元素数。如果 M 是常数(例如,总是用 COUNT 请求前 10 个元素),你可以认为它是 O(1)。另一方面,当 XREADGROUP 阻塞时,XADD 将支付 O(N) 时间,以便为流上阻塞的 N 个客户端提供新数据。
ACL 类别:
@write, @stream, @slow, @blocking,

XREADGROUPcommand 是XREAD命令 支持使用者组。可能您必须了解XREAD命令才有意义。

此外,如果您不熟悉流,我们建议您阅读我们的 Redis Streams 简介。 请务必了解引言中 Consumer Group 的概念 这样遵循这个命令的工作原理会更简单。

30 秒消费组

这个命令和原版XREAD是这个 一个支持使用者组。

没有消费组,只需使用XREAD,则所有客户端都将与流中到达的所有条目一起提供。而是使用 Consumer groups 和XREADGROUP,可以创建客户端组,这些客户端使用到达给定流的消息的不同部分。例如,如果流获取新条目 A、B 和 C,并且有两个使用者通过使用者组读取数据,则一个客户端将获取消息 A 和 C,另一个客户端将获取消息 B,依此类推。

在使用者组中,给定的使用者(即,仅使用流中消息的客户端)必须使用唯一的使用者名称进行标识。这只是一个字符串。

使用者组的保证之一是,给定使用者只能查看已传送给它的消息的历史记录,因此一条消息只有一个所有者。但是,有一个称为消息声明的特殊功能,它允许其他使用者在某个使用者出现不可恢复的故障时声明消息。为了实现此类语义,消费者组需要通过XACK命令。这是必需的,因为流将跟踪每个使用者组谁正在处理什么消息。

以下是了解是否要使用 Consumer Group 的方法:

  1. 如果您有一个流和多个客户端,并且希望所有客户端都能获取所有消息,则不需要使用者组。
  2. 如果您有一个流和多个客户端,并且希望在客户端之间对流进行分区分片,以便每个客户端都能获得到达流中的消息的子集,则需要一个使用者组。

XREAD 和 XREADGROUP 之间的区别

从语法的角度来看,命令几乎相同, 然而XREADGROUP 需要一个特殊的强制性选项:

GROUP <group-name> <consumer-name>

组名称只是与流关联的使用者组的名称。 该组是使用XGROUP命令。使用者名称是 客户端用于在组内标识自身的字符串。 使用者首次在使用者组内自动创建 被看到。不同的客户端应选择不同的使用者名称。

当您使用XREADGROUP,服务器会记住给定的 message was delivered to you (消息已发送给您):消息将存储在 使用者组,即 Pending Entries List (PEL) 中的 已送达但尚未确认的消息 ID 列表。

客户端必须使用XACK以便从 PEL 中删除待处理条目。The PEL 可以使用XPENDING命令。

NOACK子命令可用于避免将消息添加到 不需要可靠性且偶尔会丢失消息的情况 是可以接受的。这相当于在读取消息时确认消息。

使用XREADGROUP能 成为以下两项之一:

  • 特殊 ID,这意味着使用者只想接收从未传送给任何其他使用者的消息。它只是意味着,给我新消息。>
  • 任何其他 ID,即 0 或任何其他有效 ID 或不完整的 ID(只是毫秒时间部分),将具有返回等待使用者发送 ID 大于所提供 ID 的命令的条目的效果。所以基本上如果 ID 不是,那么该命令将只允许客户端访问其待处理的条目:已送达但尚未确认的消息。请注意,在这种情况下,两者>BLOCKNOACK被忽略。

喜欢XREADXREADGROUPcommand 可以以阻塞方式使用。那里 在这方面没有区别。

将消息传送给使用者时会发生什么情况?

两件事:

  1. 如果该消息从未发送给任何人,也就是说,如果我们谈论的是新消息,则会创建一个 PEL(待处理条目列表)。
  2. 相反,如果消息已传送给此使用者,并且它只是再次重新获取同一消息,则最后一个传送计数器将更新为当前时间,并且传送次数将递增 1。您可以使用XPENDING命令。

使用示例

通常,您使用这样的命令来获取新消息和 处理它们。在伪代码中:

WHILE true
    entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message.id,message.fields)

            # ACK the message as processed
            XACK mystream $GroupName message.id
        END
    END
END

这样,示例消费者代码将仅获取新消息,处理 他们,并通过以下方式确认他们XACK.但是,上面的示例代码是 未完成,因为它不处理崩溃后的恢复。什么 如果我们在处理消息的过程中崩溃,则会发生,这是因为我们的 消息将保留在 Pending entries 列表中,因此我们可以访问我们的 历史XREADGROUP最初为 ID 为 0,并执行相同的 圈。一旦提供 ID 为 0,回复就是一组空的消息,我们 知道我们处理并确认了所有待处理的消息:我们 可以开始使用 as ID,以便获取新消息并重新加入 正在处理新事物的消费者。>

要查看命令的实际回复方式,请检查XREAD命令页面。

删除待处理消息时会发生什么情况?

由于修剪或显式调用XDEL随时。 根据设计,Redis 不会阻止删除流的 PEL 中存在的条目。 发生这种情况时,PEL 会保留已删除条目的 ID,但实际的条目有效负载不再可用。 因此,在读取此类 PEL 条目时,Redis 将返回一个 null 值来代替它们各自的数据。

例:

> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) 1) "myfield"
            2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) (nil)

阅读 Redis Streams 简介 建议,以便更多地了解流的整体行为 和语义。

RESP2 回复

以下选项之一:

  • Array reply:一个数组,其中每个元素都是一个数组,由两个元素组成,其中包含键名称和为该键报告的条目。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按添加顺序报告XADD.
  • Nil 回复:如果给出了 BLOCK 选项并且发生超时,或者没有可以提供的流。

RESP3 回复

以下选项之一:

  • Map reply:键值元素的映射,其中每个元素都由键名称和为该键报告的条目组成。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按添加顺序报告XADD.
  • 空回复:如果给出了 BLOCK 选项并且发生超时,或者没有可以提供的流。

为本页评分
返回顶部 ↑