数据转换管道
了解如何将数据转换为 Redis 类型
Write-behind 的数据转换功能允许用户将其数据转换到源类型的默认转换之外,而不是 Redis 类型。转换不涉及编码。相反,它在一组人类可读的 YAML 文件中进行描述,每个源表一个。
摄取的格式和类型因来源而异。目前,唯一支持的来源是 Debezium。从 Debezium 类型到具有 Redis 类型的原生 JSON 的第一次转换是自动完成的,无需任何用户说明。然后,此 JSON 将传递到用户定义的转换管道。
每个作业都描述了要对来自单个源的数据执行的转换逻辑。源通常是数据库表或集合,并被指定为此表/集合的全名。该作业可能包括筛选逻辑以跳过与条件匹配的数据。作业中的其他逻辑步骤会将数据转换为所需的输出,这些输出将作为哈希或 JSON 存储在 Redis 中。

默认作业
如果需要对所有摄取的记录执行转换,而不为特定表创建特定作业,则使用默认作业。与此作业关联的转换将应用于缺少自己显式定义的作业的所有表。默认作业的表名必须为 “*”,并且只允许此类作业的一个实例。
例如,默认作业可以简化任务,例如向 Redis 键添加前缀或后缀,或者向新的哈希和 JSON 添加字段,而无需自定义每个源表。
目前,仅摄取管道支持默认作业。
例
此示例演示了添加app_code
值为foo
将 add_field 块用于缺少显式定义作业的所有表。此外,它还会附加一个aws
prefix 和gcp
postfix 添加到每个生成的哈希键中。
默认.yaml
source:
table: "*"
row_format: full
transform:
- uses: add_field
with:
fields:
- field: after.app_code
expression: "`foo`"
language: jmespath
output:
- uses: redis.write
with:
data_type: hash
key:
expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
language: jmespath
Jobs
Each job is defined in a separate YAML file. All of these files will be uploaded to Write-behind using the deploy
command.
For more information, see deploy configuration). If you are using the
scaffold command,
place the job files in the jobs
folder.
Job YAML structure
Fields
-
source
:
This section describes the table that the job operates on:
server_name
: logical server name (optional). Corresponds to the debezium.source.topic.prefix
property specified in Debezium Server's application.properties
config file
db
: database name (optional)
schema
: database schema (optional)
table
: database table name
row_format
: format of the data to be transformed: data_only
(default) - only payload, full - complete change record
Note: Any reference to the properties server_name
, db
, schema
, and table
will be treated by default as case insensitive. This can be changed by setting case_insensitive
to false
.
Cassandra only: In Cassandra, a keyspace
is roughly the equivalent to a schema
in other databases. Write-behind uses the schema
property declared in a job file to match the keyspace
attribute of the incoming change record.
MongoDB only: In MongoDB, a replica set
is a cluster of shards with data and can be regarded as roughly equivalent to a schema
in a relational database. A MongoDB collection
is similar to a table
in other databases. Write-behind uses the schema
and table
properties declared in a job file to match the replica set
and collection
attributes of the incoming change record, respectively.
-
transform
:
This section includes a series of blocks that define how the data will be transformed.
For more information, see
supported blocks
and JMESPath custom functions.
-
output
:
This section defines the output targets for processed data:
- Cassandra:
uses
: cassandra.write
: write into a Cassandra data store
with
:
connection
: connection name
keyspace
: keyspace
table
: target table
keys
: array of key columns
mapping
: array of mapping columns
opcode_field
: the name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the database
- Redis:
uses
: redis.write
: write to a Redis data structure. Multiple blocks of this type are allowed in the same job
with
:
connection
: connection name as defined in config.yaml
(by default, the connection named 'target' is used)
data_type
: target data structure when writing data to Redis (hash, json, set and stream are supported values)
key
: this allows you to override the key of the record by applying custom logic:
expression
: expression to execute
language
: expression language, JMESPath or SQL
expire
: positive integer value indicating a number of seconds for the key to expire. If not set, the key will never expire
- SQL:
uses
: relational.write
: write into a SQL-compatible data store
with
:
connection
: connection name
schema
: schema
table
: target table name
keys
: array of key columns
mapping
: array of mapping columns
opcode_field
: the name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the database
Notes
source
is required.
- Either
transform
, key
, or both should be specified.
Using key in transformations
To access the Redis key (for example in a write-behind job) you will need to take the following steps:
- Set
row_format: full
to allow access to the key that is part of the full data entry.
- Use the expression
key.key
to get the Redis key as a string.
Before and after values
Update events typically report before
and after
sections, providing access to the data state before and after the update.
To access the "before" values explicitly, you will need to:
- Set
row_format: full
to allow access to the key that is part of the full data entry.
- Use the
before.<FIELD_NAME>
pattern.
Example
This example shows how to rename the fname
field to first_name
in the table emp
using the rename_field
block. It also demonstrates how to set the key of this record instead of relying on the default logic.
redislabs.dbo.emp.yaml
source:
server_name: redislabs
schema: dbo
table: emp
transform:
- uses: rename_field
with:
from_field: fname
to_field: first_name
output:
- uses: redis.write
with:
connection: target
key:
expression: concat(['emp:fname:',fname,':lname:',lname])
language: jmespath
Deploy configuration
In order to deploy your jobs to the remote Write-behind database, run:
redis-di deploy
Deploy configuration on Kubernetes
If the Write-behind CLI is deployed as a pod in a Kubernetes cluster, perform these steps to deploy your jobs:
-
Create a ConfigMap from the YAML files in your jobs
folder:
kubectl create configmap redis-di-jobs --from-file=jobs/
-
Deploy your jobs:
kubectl exec -it pod/redis-di-cli -- redis-di deploy
Note: A delay occurs between creating/modifying the ConfigMap and its availability in the redis-di-cli
pod. Wait around 30 seconds before running the redis-di deploy
command.
You have two options to update the ConfigMap:
-
For smaller changes, you can edit the ConfigMap directly with this command:
kubectl edit configmap redis-di-jobs
-
For bigger changes such as adding another job file, edit the files in your local jobs
folder and then run this command:
kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f -
Note: You need to run kubectl exec -it pod/redis-di-cli -- redis-di deploy
after updating the ConfigMap with either option.
On this page