配置数据管道
了解如何配置引入管道以进行数据转换
RDI 实施变更数据捕获 (CDC) 与管道。(有关管道的介绍,请参阅体系结构概述。
概述
RDI 管道从源数据库中捕获更改数据记录,并对其进行转换 转换为 Redis 数据结构。它将每个新结构写入 Redis 目标 database 的 key 下。
默认情况下,RDI 使用 标准数据映射和键的标准格式。 但是,您也可以使用自己的数据映射和键模式为每个源表提供自己的自定义转换作业。您指定这些 jobs 与不需要编码的 YAML 配置文件一起声明。
数据转换涉及两个独立的阶段。首先,提取的数据 在 CDC 期间自动转换为 JSON 格式。然后 此 JSON 数据将传递到您的自定义转换以进行进一步处理。
您可以为要转换的每个源表提供一个作业文件,但您可以 还可以为没有自己的表的任何表添加 Default Job。 您必须在作业文件中指定源表的全名(或特殊的 name “*”) 和 you 还可以包括筛选逻辑以跳过与特定条件匹配的数据。 作为转换的一部分,您可以指定是否要将 Redis 中的数据作为 JSON 对象、哈希、集、Stream、排序集或字符串。
下图显示了通过管道的数据流:
管道配置
RDI 使用一组 YAML 文件来配置每个管道。下图显示了该文件夹 结构配置:
管道的主要配置位于config.yaml
文件。
这将指定源数据库的连接详细信息(例如
作为主机、用户名和密码)以及 RDI 将使用的查询
以提取所需的数据。您应该将作业配置放在Jobs
文件夹(如果要指定自己的数据转换)。
以下各节更详细地描述了这两种类型的配置文件。
这config.yaml
文件
下面是一个config.yaml
文件。请注意,
表格 ”${name}
“,指的是您应该使用redis-di set-secret
命令。特别是,您通常应该使用环境变量来设置源
并定位用户名和密码,而不是将它们以纯文本形式存储在此
文件(有关更多信息,请参阅设置密钥)。
sources:
mysql:
type: cdc
logging:
level: info
connection:
type: mysql
host: <DB_HOST> # e.g. localhost
port: 3306
# User and password are injected from the secrets.
user: ${SOURCE_DB_USERNAME}
password: ${SOURCE_DB_PASSWORD}
# Additional properties for the source collector:
# List of databases to include (optional).
# databases:
# - database1
# - database2
# List of tables to be synced (optional).
# tables:
# If only one database is specified in the databases property above,
# then tables can be defined without the database prefix.
# <DATABASE_NAME>.<TABLE_NAME>:
# List of columns to be synced (optional).
# columns:
# - <COLUMN_NAME>
# - <COLUMN_NAME>
# List of columns to be used as keys (optional).
# keys:
# - <COLUMN_NAME>
# Example: Sync specific tables.
# tables:
# Sync a specific table with all its columns:
# redislabscdc.account: {}
# Sync a specific table with selected columns:
# redislabscdc.emp:
# columns:
# - empno
# - fname
# - lname
# Advanced collector properties (optional):
# advanced:
# Sink collector properties - see the full list at
# https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_redis_stream
# sink:
# Optional hard limits on memory usage of RDI streams.
# redis.memory.limit.mb: 300
# redis.memory.threshold.percentage: 85
# Uncomment for production so RDI Collector will wait on replica
# when writing entries.
# redis.wait.enabled: true
# redis.wait.timeout.ms: 1000
# redis.wait.retry.enabled: true
# redis.wait.retry.delay.ms: 1000
# Source specific properties - see the full list at
# https://debezium.io/documentation/reference/stable/connectors/
# source:
# snapshot.mode: initial
# Uncomment if you want a snapshot to include only a subset of the rows
# in a table. This property affects snapshots only.
# snapshot.select.statement.overrides: <DATABASE_NAME>.<TABLE_NAME>
# The specified SELECT statement determines the subset of table rows to
# include in the snapshot.
# snapshot.select.statement.overrides.<DATABASE_NAME>.<TABLE_NAME>: <SELECT_STATEMENT>
# Example: Snapshot filtering by order status.
# To include only orders with non-pending status from customers.orders
# table:
# snapshot.select.statement.overrides: customer.orders
# snapshot.select.statement.overrides.customer.orders: SELECT * FROM customers.orders WHERE status != 'pending' ORDER BY order_id DESC
# Quarkus framework properties - see the full list at
# https://quarkus.io/guides/all-config
# quarkus:
# banner.enabled: "false"
targets:
# Redis target database connections.
# The default connection must be named 'target' and is used when no
# connection is specified in jobs or no jobs
# are deployed. However multiple connections can be defined here and used
# in the job definition output blocks:
# (e.g. target1, my-cloud-redis-db2, etc.)
target:
connection:
type: redis
# Host of the Redis database to which RDI will
# write the processed data.
host: <REDIS_TARGET_DB_HOST> # e.g. localhost
# Port for the Redis database to which RDI will
# write the processed data.
port: <REDIS_TARGET_DB_PORT> # e.g. 12000
# User of the Redis database to which RDI will write the processed data.
# Uncomment if you are not using the default user.
# user: ${TARGET_DB_USERNAME}
# Password for Redis target database.
password: ${TARGET_DB_PASSWORD}
# SSL/TLS configuration: Uncomment to enable secure connections.
# key: ${TARGET_DB_KEY}
# key_password: ${TARGET_DB_KEY_PASSWORD}
# cert: ${TARGET_DB_CERT}
# cacert: ${TARGET_DB_CACERT}
processors:
# Interval (in seconds) on which to perform retry on failure.
# on_failed_retry_interval: 5
# The batch size for reading data from the source database.
# read_batch_size: 2000
# Time (in ms) after which data will be read from stream even if
# read_batch_size was not reached.
# duration: 100
# The batch size for writing data to the target Redis database. Should be
# less than or equal to the read_batch_size.
# write_batch_size: 200
# Enable deduplication mechanism (default: false).
# dedup: <DEDUP_ENABLED>
# Max size of the deduplication set (default: 1024).
# dedup_max_size: <DEDUP_MAX_SIZE>
# Error handling strategy: ignore - skip, dlq - store rejected messages
# in a dead letter queue
# error_handling: dlq
The main sections of the file configure
sources
and targets
.
Sources
The sources
section has a subsection for the source that
you need to configure. The source section starts with a unique name
to identify the source (in the example we have a source
called mysql
but you can choose any name you like). The example
configuration contains the following data:
type
: The type of collector to use for the pipeline. Currently, the only type we support is cdc
.
connection
: The connection details for the source database: hostname, port, schema/ db name, database credentials and
TLS/
mTLS secrets.
tables
: The dataset you want to collect from the source. This subsection
specifies:
snapshot_sql
: A query that selects the tables to include in the dataset
(the default is to include all tables if you don't specify a query here).
columns
: A list of the columns you are interested in (the default is to
include all columns if you don't supply a list)
keys
: A list of columns to create a composite key if your table
doesn't already have a PRIMARY KEY
or
UNIQUE
constraint.
advanced
: These optional properties configure other Debezium-specific features.
The available sub-sections are:
sink
: All advanced properties for writing to RDI (TLS, memory threshold, etc).
See the Debezium Redis stream properties
page for the full set of available properties.
source
: All advanced connector properties (for example, RAC nodes).
See Database-specific connection properties below and also
see the
Debezium Connectors
pages for more information about the properties available for each database type.
quarkus
: All advanced properties for Debezium server, such as the log level. See the
Quarkus Configuration options
docs for the full set of available properties.
Targets
Use this section to provide the connection details for the target Redis
database(s). As with the sources, you should start each target section
with a unique name that you are free to choose (here, we have used
my-redis
as an example). In the connection
section, you can supply the
type
of target database, which will generally be redis
along with the
host
and port
of the server. You can also supply connection credentials
and TLS/mTLS secrets here if you use them.
Note:
If you specify localhost
as the address of either the source or target server during
installation then the connection will fail if the actual IP address changes for the local
VM. For this reason, we recommend that you don't use localhost
for the address. However,
if you do encounter this problem, you can fix it using the following commands on the VM
that is running RDI itself:
sudo k3s kubectl delete nodes --all
sudo service k3s restart
Job files
You can optionally supply one or more job files that specify how you want to
transform the captured data before writing it to the target.
Each job file contains a YAML
configuration that controls the transformation for a particular table from the source
database. You can also add a default-job.yaml
file to provide
a default transformation for tables that don't have a specific job file of their own.
The job files have a structure like the following example. This configures a default
job that:
- Writes the data to a Redis hash
- Adds a field
app_code
to the hash with a value of foo
- Adds a prefix of
aws
and a suffix of gcp
to the key
source:
table: "*"
row_format: full
transform:
- uses: add_field
with:
fields:
- field: after.app_code
expression: "`foo`"
language: jmespath
output:
- uses: redis.write
with:
data_type: hash
key:
expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
language: jmespath
The main sections of these files are:
-
source
: This is a mandatory section that specifies the data items that you want to
use. You can add the following properties here:
server_name
: Logical server name (optional). This corresponds to the debezium.source.topic.prefix
property specified in the Debezium Server's application.properties
config file.
db
: Database name (optional)
schema
: Database schema (optional)
table
: Database table name. This refers to a table name you supplied in config.yaml
. The default
job doesn't apply to a specific table, so use "*" in place of the table name for this job only.
row_format
: Format of the data to be transformed. This can take the values data_only
(default) to
use only the payload data, or full
to use the complete change record. See the transform
section below
for details of the extra data you can access when you use the full
option.
case_insensitive
: This applies to the server_name
, db
, schema
, and table
properties
and is set to true
by default. Set it to false
if you need to use case-sensitive values for these
properties.
-
transform
: This is an optional section describing the transformation that the pipeline
applies to the data before writing it to the target. The uses
property specifies a
transformation block that will use the parameters supplied in the with
section. See the
data transformation reference
for more details about the supported transformation blocks, and also the
JMESPath custom functions reference.
Note:
If you set row_format
to full
under the source
settings, you can access extra data from the
change record in the transformation:
- Use the expression
key.key
to get the generated Redis key as a string.
- Use
before.<FIELD_NAME>
to get the value of a field before it was updated in the source database
(the field name by itself gives you the value after the update).
-
output
: This is a mandatory section to specify the data structure(s) that
RDI will write to
the target along with the text pattern for the key(s) that will access it.
Note that you can map one record to more than one key in Redis or nest
a record as a field of a JSON structure (see
Data denormalization
for more information about nesting). You can add the following properties in the output
section:
uses
: This must have the value redis.write
to specify writing to a Redis data
structure. You can add more than one block of this type in the same job.
with
:
connection
: Connection name as defined in config.yaml
(by default, the connection named target
is used).
data_type
: Target data structure when writing data to Redis. The supported types are hash
, json
, set
,
sorted_set
, stream
and string
.
key
: This lets you override the default key for the data structure with custom logic:
expression
: Expression to generate the key.
language
: Expression language, which must be jmespath
or sql
.
expire
: Positive integer value indicating a number of seconds for the key to expire.
If you don't specify this property, the key will never expire.
Note:
In a job file, the transform
section is optional, but if you don't specify
a transform
, you must specify custom key logic in output.with.key
. You can include
both of these sections if you want both a custom transform and a custom key.
Another example below shows how you can rename the fname
field to first_name
in the table emp
using the
rename_field
block. It also demonstrates how you can set the key of this record instead of relying on
the default logic. (See the
Transformation examples
section for more examples of job files.)
source:
server_name: redislabs
schema: dbo
table: emp
transform:
- uses: rename_field
with:
from_field: fname
to_field: first_name
output:
- uses: redis.write
with:
connection: target
key:
expression: concat(['emp:fname:',fname,':lname:',lname])
language: jmespath
See the
RDI configuration file
reference for full details about the
available source, transform, and target configuration options and see
also the
data transformation reference
for details of all the available transformation blocks.
Source preparation
Before using the pipeline you must first prepare your source database to use
the Debezium connector for change data capture (CDC). See the
architecture overview
for more information about CDC.
Each database type has a different set of preparation steps. You can
find the preparation guides for the databases that RDI supports in the
Prepare source databases
section.
Deploy a pipeline
When your configuration is ready, you must deploy it to start using the pipeline. See
Deploy a pipeline
to learn how to do this.
Ingest pipeline lifecycle
Once you have created the configuration for a pipeline, it goes through the
following phases:
- Deploy - when you deploy the pipeline, RDI first validates it before use.
Then, the operator creates and configures the collector and stream processor that will run the pipeline.
- Snapshot - The collector starts the pipeline by creating a snapshot of the full
dataset. This involves reading all the relevant source data, transforming it and then
writing it into the Redis target. You should expect this phase to take minutes or
hours to complete if you have a lot of data.
- CDC - Once the snapshot is complete, the collector starts listening for updates to
the source data. Whenever a change is committed to the source, the collector captures
it and adds it to the target through the pipeline. This phase continues indefinitely
unless you change the pipeline configuration.
- Update - If you update the pipeline configuration, the operator starts applying it
to the processor and the collector. Note that the changes only affect newly-captured
data unless you reset the pipeline completely. Once RDI has accepted the updates, the
pipeline returns to the CDC phase with the new configuration.
- Reset - There are circumstances where you might want to rebuild the dataset
completely. For example, you might want to apply a new transformation to all the source
data or refresh the dataset if RDI is disconnected from the
source for a long time. In situations like these, you can reset the pipeline back
to the snapshot phase. When this is complete, the pipeline continues with CDC as usual.
On this page