在 Redis Insight 中管理流和使用者组

了解如何在 Redis Insight 中管理流和使用者组

是仅追加的日志文件。 向其添加数据时,无法对其进行更改。 这似乎是一个缺点;但是,流用作日志或单一事实来源。 它还可以用作以不同速度工作且不需要相互了解的进程之间的缓冲区。 有关流的更多概念信息,请参阅 Redis 流

在本主题中,您将学习如何在 Redis Insight 中添加和使用流以及使用者组。

这是一个对温度和湿度传感器进行建模的流。与流交互的进程执行以下两个角色之一:使用者生成者。 流的重点是它不会结束,因此您无法捕获整个数据集并对其进行一些处理。

在此流中,传感器被视为生产者,它们广播数据。 使用者从流中读取数据并对其进行一些作。 例如,如果温度高于某个阈值,它会发出一条消息以打开该单元的空调或通知维护。

可以让多个消费者执行不同的工作,一个测量湿度,另一个在一段时间内进行温度测量。 Redis 将整个数据集的副本存储在内存中,这是一种有限的资源。 为避免数据失控,您可以在向流添加内容时对其进行修剪。 当使用XADD,您可以选择指定流应修剪为特定或大致数量的最新条目,或仅包含 ID 高于指定 ID 的条目。 您还可以使用密钥过期来管理流数据所需的存储。例如,将每天的数据写入 Redis 中自己的流,并在一段时间(例如一周)后使每个流的密钥过期。 ID 可以是任何数字,但流中的每个新条目都必须具有一个 ID,其值高于添加到流中的最后一个 ID。

添加新条目

XADD替换为 ID,让 Redis 自动生成一个新 ID,该 ID 由毫秒精度时间戳、短划线和序列号组成。例如*1656416957625-0.然后提供字段名称和值以存储在新的 stream 条目中。

有几种方法可以检索内容。您可以按时间范围检索条目,也可以请求自您指定的时间戳或 ID 以来发生的所有事情。使用单个命令,您可以在给定日期的上午 10:30 到 11:15 之间请求任何内容。

消费群体

更现实的使用案例是具有许多温度传感器的系统,Redis 将其数据放入流中,记录它们到达的时间并对其进行排序。

在右侧,我们有两个读取流的使用者。其中一个是如果温度超过某个数字,并向维护人员发送短信,告诉他们需要做点什么,另一个是数据仓库,用于获取数据并将其放入数据库。

它们彼此独立运行。 在右边,我们有另一种任务。 我们假设警报和数据仓库的速度非常快。 如果温度大于特定值,您会收到一条消息,这可能需要一毫秒。 警报可以跟上数据流。 扩展使用者的一种方法是使用者组,它允许同一使用者或相同代码的多个实例作为一个团队来处理流。

在 Redis Insight 中管理流

您可以通过两种方式在 Redis Insight 中添加流:创建新流或添加到现有流。

要创建流,请先选择键类型 (stream)。 您不能设置生存时间 (TTL),因为它不能放在流中的消息上;它只能在 Redis 密钥上完成。将流命名为 mystream。 然后,将 Entry ID (条目 ID) 设置为 default to timestamp (时间戳)。 如果您有自己的 ID 生成策略,请输入序列中的下一个 ID。请记住,该 ID 必须高于流中任何其他条目的 ID。*

然后,使用 + 输入字段和值以添加多个字段和值(例如,name 和 location)。 现在,您有一个显示在 Streams (流) 视图中的流,您可以继续向其添加字段和值。

Redis Insight 为您运行读取命令,以便您可以在 Streams (流) 视图中查看流条目。 Consumer Groups (使用者组) 视图显示给定使用者组中的每个使用者以及 Redis 上次分配消息的时间、消息的 ID 以及该过程发生的次数,以及使用者是否已告知 Redis 您已使用XACK命令。

通过 Redis Insight 中的传感器监控温度和湿度

此示例说明如何将现有流引入 Redis Insight 并使用它。

设置

  1. 安装 Redis Insight
  2. 下载并安装 Node.js(LTS 版本)。
  3. 安装 Redis。在 Docker 中,检查 Redis 是否在默认端口 6379(未设置密码)上本地运行。
  4. 克隆此示例的代码存储库。 有关此示例和安装提示的更多信息,请参阅 README
  5. 在命令行中,导航到包含代码存储库的文件夹,然后安装 Node.js 包管理器 (npm)。
npm install

Run the producer

To start the producer, which will add a new entry to the stream every few seconds, enter:

npm run producer

> streams@1.0.0 producer
> node producer.js

Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...

The producer runs indefinitely. Select Ctrl+C to stop it. You can start multiple instances of the producer if you want to add entries to the stream faster.

Run the consumer

To start the consumer, which reads from the stream every few seconds, enter:

npm run consumer

> streams@1.0.0 consumer
> node consumer.js

Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]

The consumer stores the last entry ID that it read in a Redis string at the key consumer:lastid. It uses this string to pick up from where it left off after it is restarted. Try this out by stopping it with Ctrl+C and restarting it.

Once the consumer has processed every entry in the stream, it will wait indefinitely for instances of the producer to add more:

Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...

Stop it using Ctrl+C.

Run a consumer group

A consumer group consists of multiple consumer instances working together. Redis manages allocation of entries read from the stream to members of a consumer group. A consumer in a group will receive a subset of the entries, with the group as a whole receiving all of them. When working in a consumer group, a consumer process must acknowledge receipt/processing of each entry.

Using multiple terminal windows, start three instances of the consumer group consumer, giving each a unique name:

npm run consumergroup consumer1

> streams@1.0.0 consumergroup
> node consumer_group.js -- "consumer1"

Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...

In a second terminal:

npm run consumergroup consumer2

And in a third:

npm run consumergroup consumer3

The consumers will run indefinitely, waiting for new messages to be added to the stream by a producer instance when they have collectively consumed the entire stream. Note that in this model, each consumer instance does not receive all of the entries from the stream, but the three members of the group each receive a subset.

View the stream in Redis Insight

  1. Launch Redis Insight.
  2. Select localhost:6379
  3. Select STREAM. Optionally, select full screen from the upper right corner to expand the view.

You can now toggle between Stream and Consumer Groups views to see your data. As mentioned earlier in this topic, a stream is an append-only log so you can't modify the contents of an entry, but you can delete an entire entry. A case when that's useful is in the event of a so-called poison-pill message that can cause consumers to crash. You can physically remove such messages in the Streams view or use the XDEL command at the command-line interface (CLI).

You can continue interacting with your stream at the CLI. For example, to get the current length of a stream, use the XLEN command:

XLEN ingest:temphumidity

Use streams for auditing and processing events in banking, gaming, supply chain, IoT, social media, and so on.

RATE THIS PAGE
Back to top ↑