主动-主动数据库中的流

有关将流与主动-主动数据库一起使用的信息。

Redis 企业软件 Redis 云

Redis 流是一种数据结构,其作用类似于仅追加日志。 每个流条目包括:

  • 唯一的、单调递增的 ID
  • 由一系列键值对组成的有效负载

您可以使用 XADD 命令将条目添加到流中。您可以使用 XRANGE、XREADGROUP 和 XREAD 命令访问流条目(但是,请参阅下面有关 XREAD 的注意事项)。

流和主动-主动

主动-主动数据库允许您从多个区域写入同一逻辑流。 流在主动-主动数据库的区域之间同步。

在下面的示例中,我们从两个区域同时写入流。请注意,同步后,两个区域具有相同的流:

时间 区域 1 区域 2
T1 XADD messages * text hello XADD messages * text goodbye
T2 XRANGE messages - +
→ [1589929244828-1]
XRANGE messages - +
→ [1589929246795-2]
T3 — 同步 — — 同步 —
T4 XRANGE messages - +
→ [1589929244828-1, 1589929246795-2]
XRANGE messages - +
→ [1589929244828-1, 1589929246795-2]

另请注意,同步的流不包含重复的 ID。只要您允许数据库生成流 ID,您就永远不会有多个具有相同 ID 的流条目。

注意:
Redis 社区版使用一棵基数树(简称rax)来实现每个流。但是,主动-主动数据库使用一个rax每个区域。 每个区域仅将条目添加到其关联的rax(但可以从所有rax树)。 这意味着 XREAD 和 XREADGROUP 同时迭代所有raxtrees 的 ID 中,并通过比较每个rax.

冲突解决

主动-主动数据库使用“observed-remove”方法来自动解决潜在冲突。

使用此方法时,删除仅影响本地可观察的数据。

在下面的示例中,流xt1 处创建。在 t3 处,流存在于两个区域中。

时间 区域 1 区域 2
T1 XADD messages * text hello
T2 — 同步 — — 同步 —
T3 XRANGE messages - +
→ [1589929244828-1]
XRANGE messages - +
→ [1589929244828-1]
T4 DEL messages XADD messages * text goodbye
T5 — 同步 — — 同步 —
T6 XRANGE messages - +
→ [1589929246795-2]
XRANGE messages - +
→ [1589929246795-2]

t4 时,流将从区域 1 中删除。同时,ID 以3700将添加到区域 2 的同一流中。同步后,在 t6 处,ID 以3700在这两个区域中都存在。这是因为在 t4 删除本地流时,该条目不可见。

ID 生成模式

通常,您应该允许 Redis 流生成自己的流条目 ID。您可以通过在对 XADD 的调用中指定 ID 来执行此作。但是,您可以在向流添加条目时提供自己的自定义 ID。*

由于主动-主动数据库是异步复制的,因此提供您自己的 ID 可以创建具有重复 ID 的流。当您从多个区域写入同一流时,可能会发生这种情况。

时间 区域 1 区域 2
T1 XADD x 100-1 f1 v1 XADD x 100-1 f1 v1
T2 — 同步 — — 同步 —
T3 XRANGE x - +
→ [100-1, 100-1]
XRANGE x - +
→ [100-1, 100-1]

在此方案中,ID 为100-1t1 处添加。同步后,流x包含两个具有相同 ID 的条目。

注意:
Redis 社区版的 Stream ID 由两个整数组成,用短划线 ('-') 分隔。当服务器生成 ID 时,第一个整数是当前时间(以毫秒为单位),第二个整数是序列号。因此,流 ID 的格式为 MS-SEQ。

为了防止重复 ID 并符合原始的 Redis 流设计,主动-主动数据库为 XADD 提供了三种 ID 模式:

  1. 严格:在严格模式下,XADD 允许服务器生成的 ID(使用 '' ID 说明符)或仅包含毫秒 (MS) 部分的 ID。当提供 ID 的毫秒部分时,将使用数据库的区域 ID 计算 ID 的序列号。这可以防止流中出现重复的 ID。严格模式拒绝完整 ID(即同时包含毫秒和序列号的 ID)。*
  2. 半严格半严格模式与严格模式类似,不同之处在于它允许使用完整 ID (MS-SEQ)。由于它允许使用完整 ID,因此在此模式下可能会出现重复的 ID。
  3. 自由:XADD 允许任何单调升序的 ID。当给定 ID 的毫秒部分时,序列号将设置为0.此模式还可能导致重复的 ID。

默认和推荐模式为 strict,可防止重复 ID。

警告:
为什么要防止重复的 ID?首先,当流中存在重复的 ID 时,XDEL、XCLAIM 和其他命令可以影响多个条目。其次,如果导出或重命名数据库,可能会删除重复的条目。

要更改 XADD 的 ID 生成模式,请使用rladmin命令行实用程序:

设置 strict 模式:

rladmin tune db crdb crdt_xadd_id_uniqueness_mode strict

Set semi-strict mode:

rladmin tune db crdb crdt_xadd_id_uniqueness_mode semi-strict

Set liberal mode:

rladmin tune db crdb crdt_xadd_id_uniqueness_mode liberal

Iterating a stream with XREAD

In Redis Community Edition and in non-Active-Active databases, you can use XREAD to iterate over the entries in a Redis Stream. However, with an Active-Active database, XREAD may skip entries. This can happen when multiple regions write to the same stream.

In the example below, XREAD skips entry 115-2.

Time Region 1 Region 2
t1 XADD x 110 f1 v1 XADD x 115 f1 v1
t2 XADD x 120 f1 v1
t3 XADD x 130 f1 v1
t4 XREAD COUNT 2 STREAMS x 0
→ [110-1, 120-1]
t5 — Sync — — Sync —
t6 XREAD COUNT 2 STREAMS x 120-1
→ [130-1]
t7 XREAD STREAMS x 0
→[110-1, 115-2, 120-1, 130-1]
XREAD STREAMS x 0
→[110-1, 115-2, 120-1, 130-1]

You can use XREAD to reliably consume a stream only if all writes to the stream originate from a single region. Otherwise, you should use XREADGROUP, which always guarantees reliable stream consumption.

Consumer groups

Active-Active databases fully support consumer groups with Redis Streams. Here is an example of creating two consumer groups concurrently:

Time Region 1 Region 2
t1 XGROUP CREATE x group1 0 XGROUP CREATE x group2 0
t2 XINFO GROUPS x
→ [group1]
XINFO GROUPS x
→ [group2]
t3 — Sync — — Sync —
t4 XINFO GROUPS x
→ [group1, group2]
XINFO GROUPS x
→ [group1, group2]
Note:

Redis Community Edition uses one radix tree (rax) to hold the global pending entries list and another rax for each consumer's PEL. The global PEL is a unification of all consumer PELs, which are disjoint.

An Active-Active database stream maintains a global PEL and a per-consumer PEL for each region.

When given an ID different from the special ">" ID, XREADGROUP iterates simultaneously over all of the PELs for all consumers. It returns the next entry by comparing entry IDs from the different PELs.

Conflict resolution

The "delete wins" approach is a way to automatically resolve conflicts with consumer groups. In case of concurrent consumer group operations, a delete will "win" over other concurrent operations on the same group.

In this example, the DEL at t4 deletes both the observed group1 and the non-observed group2:

Time Region 1 Region 2
t1 XGROUP CREATE x group1 0
t2 — Sync — — Sync —
t3 XINFO GROUPS x
→ [group1]
XINFO GROUPS x
→ [group1]
t4 DEL x XGROUP CREATE x group2 0
t5 — Sync — — Sync —
t6 EXISTS x
→ 0
EXISTS x
→ 0

In this example, the XGROUP DESTROY at t4 affects both the observed group1 created in Region 1 and the non-observed group1 created in Region 3:

time Region 1 Region 2 Region 3
t1 XGROUP CREATE x group1 0
t2 — Sync — — Sync —
t3 XINFO GROUPS x
→ [group1]
XINFO GROUPS x
→ [group1]
XINFO GROUPS x
→ []
t4 XGROUP DESTROY x group1 XGROUP CREATE x group1 0
t5 — Sync — _— Sync — — Sync —
t6 EXISTS x
→ 0
EXISTS x
→ 0
EXISTS x
→ 0

Group replication

Calls to XREADGROUP and XACK change the state of a consumer group or consumer. However, it's not efficient to replicate every change to a consumer or consumer group.

To maintain consumer groups in Active-Active databases with optimal performance:

  1. Group existence (CREATE/DESTROY) is replicated.
  2. Most XACK operations are replicated.
  3. Other operations, such as XGROUP, SETID, DELCONSUMER, are not replicated.

For example:

Time Region 1 Region 2
t1 XADD messages 110 text hello
t2 XGROUP CREATE messages group1 0
t3 XREADGROUP GROUP group1 Alice STREAMS messages >
→ [110-1]
t4 — Sync — — Sync —
t5 XRANGE messages - +
→ [110-1]
XRANGE messages - +
→ [110-1]
t6 XINFO GROUPS messages
→ [group1]
XINFO GROUPS messages
→ [group1]
t7 XINFO CONSUMERS messages group1
→ [Alice]
XINFO CONSUMERS messages group1
→ []
t8 XPENDING messages group1 - + 1
→ [110-1]
XPENDING messages group1 - + 1
→ []

Using XREADGROUP across regions can result in regions reading the same entries. This is due to the fact that Active-Active Streams is designed for at-least-once reads or a single consumer. As shown in the previous example, Region 2 is not aware of any consumer group activity, so redirecting the XREADGROUP traffic from Region 1 to Region 2 results in reading entries that have already been read.

Replication performance optimizations

Consumers acknowledge messages using the XACK command. Each ack effectively records the last consumed message. This can result in a lot of cross-region traffic. To reduce this traffic, we replicate XACK messages only when all of the read entries are acknowledged.

Time Region 1 Region 2 Explanation
t1 XADD x 110-0 f1 v1
t2 XADD x 120-0 f1 v1
t3 XADD x 130-0 f1 v1
t4 XGROUP CREATE x group1 0
t5 XREADGROUP GROUP group1 Alice STREAMS x >
→ [110-0, 120-0, 130-0]
t6 XACK x group1 110-0
t7 — Sync — — Sync — 110-0 and its preceding entries (none) were acknowledged. We replicate an XACK effect for 110-0.
t8 XACK x group1 130-0
t9 — Sync — — Sync — 130-0 was acknowledged, but not its preceding entries (120-0). We DO NOT replicate an XACK effect for 130-0
t10 XACK x group1 120-0
t11 — Sync — — Sync — 120-0 and its preceding entries (110-0 through 130-0) were acknowledged. We replicate an XACK effect for 130-0.

In this scenario, if we redirect the XREADGROUP traffic from Region 1 to Region 2 we do not re-read entries 110-0, 120-0 and 130-0. This means that the XREADGROUP does not return already-acknowledged entries.

Guarantees

Unlike XREAD, XREADGOUP will never skip stream entries. In traffic redirection, XREADGROUP may return entries that have been read but not acknowledged. It may also even return entries that have already been acknowledged.

Summary

With Active-Active streams, you can write to the same logical stream from multiple regions. As a result, the behavior of Active-Active streams differs somewhat from the behavior you get with Redis Community Edition. This is summarized below:

Stream commands

  1. When using the strict ID generation mode, XADD does not permit full stream entry IDs (that is, an ID containing both MS and SEQ).
  2. XREAD may skip entries when iterating a stream that is concurrently written to from more than one region. For reliable stream iteration, use XREADGROUP instead.
  3. XSETID fails when the new ID is less than current ID.

Consumer group notes

The following consumer group operations are replicated:

  1. Consecutive XACK operations
  2. Consumer group creation and deletion (that is, XGROUP CREATE and XGROUP DESTROY)

All other consumer group metadata is not replicated.

A few other notes:

  1. XGROUP SETID and DELCONSUMER are not replicated.
  2. Consumers exist locally (XREADGROUP creates a consumer implicitly).
  3. Renaming a stream (using RENAME) deletes all consumer group information.
RATE THIS PAGE
Back to top ↑