XREAD 读取
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- 从以下位置开始可用:
- 5.0.0
- 时间复杂度:
- ACL 类别:
-
@read
,@stream
,@slow
,@blocking
,
从一个或多个流中读取数据,仅返回具有
ID 大于调用方报告的最后收到的 ID。
此命令有一个选项,可以在项目不可用时阻止,类似于
fashion 到BRPOP
或BZPOPMIN
和其他。
请注意,在阅读此页面之前,如果您是 streams 的新手, 我们建议您阅读我们的 Redis Streams 简介。
非阻塞使用
如果未使用 BLOCK 选项,则命令是同步的,并且可以
被认为与XRANGE
:它将返回一系列项目
inside streams 中,它与XRANGE
即使我们只考虑同步用法:
- 如果我们想在
同时从多个键。这是
XREAD
因为 尤其是在使用 BLOCK 进行阻塞时,为了能够使用单个 连接到多个密钥是一项重要功能。 - 而
XRANGE
返回 ID 范围内的项目,XREAD
更适合 order 使用从第一个大于 比我们迄今为止看到的任何其他条目都要多。那么我们传递给XREAD
is,对于每个 stream,我们从该流接收的最后一个元素的 ID。
例如,如果我有两个流mystream
和writers
,而我想
从两个流中读取数据,从它们包含的第一个元素开始,
我可以调用XREAD
如以下示例所示。
注意:我们在示例中使用 COUNT 选项,以便对于每个流 调用将返回每个流最多两个元素。
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
STREAMS 选项是必需的,并且必须是最后一个选项,因为 此类选项按以下格式获取可变长度的参数:
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
因此,我们从 key 列表开始,然后继续处理所有关联的 ID 的 ID 表示我们收到的该流的最后一个 ID,因此 call 将仅提供来自同一流的更大 ID。
例如,在上面的示例中,我们收到的最后 Item
对于流mystream
具有 ID1526999352406-0
,而对于
流writers
具有 ID1526985685298-0
.
要继续迭代我将调用的两个流:
> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
2) 1) 1) 1526999626221-0
2) 1) "duration"
2) "911"
3) "event-id"
4) "7"
5) "user-id"
6) "9488232"
2) 1) "writers"
2) 1) 1) 1526985691746-0
2) 1) "name"
2) "Toni"
3) "surname"
4) "Morrison"
2) 1) 1526985712947-0
2) 1) "name"
2) "Agatha"
3) "surname"
4) "Christie"
等等。最终,调用不会返回任何项目,而只会返回一个 empty 数组,那么我们知道没有更多的东西可以从我们的 stream 中(我们将不得不重试该作,因此此命令 还支持 A 阻塞模式)。
不完整的 ID
使用不完整的 ID 是有效的,就像它对XRANGE
.然而
此处,如果缺少 ID 的序列部分,则始终解释为
zero,因此命令:
> XREAD COUNT 2 STREAMS mystream writers 0 0
完全等同于
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
阻止数据
在其同步形式中,该命令可以获取新数据,只要存在
有更多项目可用。然而,在某些时候,我们将不得不等待
要使用的数据的创建者XADD
在流中推送新条目
我们正在消耗。为了避免以固定或自适应间隔轮询
根据
添加到指定的流和 ID 中,并在
请求的键接受数据。
请务必了解,此命令扇出到所有 等待相同 ID 范围的客户端,因此每个使用者都会 获取数据的副本,这与阻止列表弹出时发生的情况不同 作。
为了阻止,使用 BLOCK 选项以及数字 的毫秒数。通常 Redis 阻塞 命令需要几秒钟的超时,但这个命令需要一毫秒 timeout,即使通常服务器的超时分辨率接近 设置为 0.1 秒。这一次可以在 某些用例,如果 Server 内部会随着时间的推移而改进,那将是 超时的解决能力可能会提高。
当传递 BLOCK 命令,但有数据要返回时 至少在传递的其中一个流中,命令将同步执行,就像缺少 BLOCK 选项时一样。
这是一个阻塞调用的示例,其中命令稍后返回 空回复,因为超时已过,没有新数据到达:
> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)
特殊 ID。$
当阻塞时,有时我们只想接收添加的条目
通过XADD
从我们阻止的那一刻开始。在这种情况下
我们对已添加条目的历史记录不感兴趣。为
在这个用例中,我们必须检查流顶部元素 ID,并使用
此类 ID 在XREAD
命令行。这是不干净的,需要
调用其他命令,因此可以使用特殊 ID 来向流发出信号,表明我们只需要新事物。$
请务必了解,您应该仅在第一次调用$
XREAD
.稍后 ID 应为 ID
流中最后一个报告的项,否则可能会错过所有
在两者之间添加的条目。
这就是典型的XREAD
call 看起来像第一次迭代中一样
的消费者只消费新条目:
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
一旦我们收到一些回复,下一个电话将是这样的:
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
等等。
特殊 ID+
您可以使用XREVRANGE
命令,如下所示:
> XREVRANGE stream + - COUNT 1
但是,随着您添加更多流,此方法会变得很慢,因为您必须为每个流发出单独的命令。
相反,从 Redis 7.4 开始,您可以将该标志用作特殊 ID。
这将请求流中的最后一个可用条目。例如:+
> XREAD STREAMS streamA streamB streamC streamD + + + +
请注意,当对流使用此特殊 ID 时,COUNT 选项将 被忽略(对于特定流),因为只能返回最后一个条目。
如何为在单个流上阻止的多个客户端提供服务
对列表或排序集的阻止列表作具有 pop 行为。 基本上,元素按顺序从列表或排序集中删除 以返回到客户端。在这种情况下,您需要 以公平的方式消费,具体取决于客户端阻止的时刻 在给定的键到达时。通常,Redis 在此 使用案例。
但是请注意,对于 streams,这不是问题:stream entries
不会从流中删除,因此每个
client waiting 将在XADD
command 提供
data 添加到流中。
阅读 Redis Streams 简介 建议,以便更多地了解流的整体行为 和语义。
RESP2 回复
以下选项之一:
- Array reply:一个数组,其中每个元素都是一个数组,由两个元素组成,其中包含键名称和为该键报告的条目。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按添加顺序报告
XADD
. - Nil 回复:如果给出了 BLOCK 选项并且发生超时,或者没有可以提供的流。
RESP3 回复
以下选项之一:
- Map reply:键值元素的映射,其中每个元素都由键名称和为该键报告的条目组成。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按添加顺序报告
XADD
. - 空回复:如果给出了 BLOCK 选项并且发生超时,或者没有可以提供的流。