数据转换管道

了解如何将数据转换为 Redis 类型

Write-behind 的数据转换功能允许用户将其数据转换到源类型的默认转换之外,而不是 Redis 类型。转换不涉及编码。相反,它在一组人类可读的 YAML 文件中进行描述,每个源表一个。

摄取的格式和类型因来源而异。目前,唯一支持的来源是 Debezium。从 Debezium 类型到具有 Redis 类型的原生 JSON 的第一次转换是自动完成的,无需任何用户说明。然后,此 JSON 将传递到用户定义的转换管道。

每个作业都描述了要对来自单个源的数据执行的转换逻辑。源通常是数据库表或集合,并被指定为此表/集合的全名。该作业可能包括筛选逻辑以跳过与条件匹配的数据。作业中的其他逻辑步骤会将数据转换为所需的输出,这些输出将作为哈希或 JSON 存储在 Redis 中。

默认作业

如果需要对所有摄取的记录执行转换,而不为特定表创建特定作业,则使用默认作业。与此作业关联的转换将应用于缺少自己显式定义的作业的所有表。默认作业的表名必须为 “*”,并且只允许此类作业的一个实例。

例如,默认作业可以简化任务,例如向 Redis 键添加前缀或后缀,或者向新的哈希和 JSON 添加字段,而无需自定义每个源表。

目前,仅摄取管道支持默认作业。

此示例演示了添加app_code值为fooadd_field 块用于缺少显式定义作业的所有表。此外,它还会附加一个awsprefix 和gcppostfix 添加到每个生成的哈希键中。

默认.yaml

source:
  table: "*"
  row_format: full
transform:
  - uses: add_field
    with:
      fields:
        - field: after.app_code
          expression: "`foo`"
          language: jmespath
output:
  - uses: redis.write
    with:
      data_type: hash
      key:
        expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
        language: jmespath

Jobs

Each job is defined in a separate YAML file. All of these files will be uploaded to Write-behind using the deploy command. For more information, see deploy configuration). If you are using the scaffold command, place the job files in the jobs folder.

Job YAML structure

Fields

  • source:

    This section describes the table that the job operates on:

    • server_name: logical server name (optional). Corresponds to the debezium.source.topic.prefix property specified in Debezium Server's application.properties config file
    • db: database name (optional)
    • schema: database schema (optional)
    • table: database table name
    • row_format: format of the data to be transformed: data_only (default) - only payload, full - complete change record

Note: Any reference to the properties server_name, db, schema, and table will be treated by default as case insensitive. This can be changed by setting case_insensitive to false.

Cassandra only: In Cassandra, a keyspace is roughly the equivalent to a schema in other databases. Write-behind uses the schema property declared in a job file to match the keyspace attribute of the incoming change record.

MongoDB only: In MongoDB, a replica set is a cluster of shards with data and can be regarded as roughly equivalent to a schema in a relational database. A MongoDB collection is similar to a table in other databases. Write-behind uses the schema and table properties declared in a job file to match the replica set and collection attributes of the incoming change record, respectively.

  • transform:

    This section includes a series of blocks that define how the data will be transformed. For more information, see supported blocks and JMESPath custom functions.

  • output:

    This section defines the output targets for processed data:

    • Cassandra:
      • uses: cassandra.write: write into a Cassandra data store
      • with:
        • connection: connection name
        • keyspace: keyspace
        • table: target table
        • keys: array of key columns
        • mapping: array of mapping columns
        • opcode_field: the name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the database
    • Redis:
      • uses: redis.write: write to a Redis data structure. Multiple blocks of this type are allowed in the same job
      • with:
        • connection: connection name as defined in config.yaml (by default, the connection named 'target' is used)
        • data_type: target data structure when writing data to Redis (hash, json, set and stream are supported values)
        • key: this allows you to override the key of the record by applying custom logic:
          • expression: expression to execute
          • language: expression language, JMESPath or SQL
        • expire: positive integer value indicating a number of seconds for the key to expire. If not set, the key will never expire
    • SQL:
      • uses: relational.write: write into a SQL-compatible data store
      • with:
        • connection: connection name
        • schema: schema
        • table: target table name
        • keys: array of key columns
        • mapping: array of mapping columns
        • opcode_field: the name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the database

Notes

  • source is required.
  • Either transform, key, or both should be specified.

Using key in transformations

To access the Redis key (for example in a write-behind job) you will need to take the following steps:

  • Set row_format: full to allow access to the key that is part of the full data entry.
  • Use the expression key.key to get the Redis key as a string.

Before and after values

Update events typically report before and after sections, providing access to the data state before and after the update. To access the "before" values explicitly, you will need to:

  • Set row_format: full to allow access to the key that is part of the full data entry.
  • Use the before.<FIELD_NAME> pattern.

Example

This example shows how to rename the fname field to first_name in the table emp using the rename_field block. It also demonstrates how to set the key of this record instead of relying on the default logic.

redislabs.dbo.emp.yaml

source:
  server_name: redislabs
  schema: dbo
  table: emp
transform:
  - uses: rename_field
    with:
      from_field: fname
      to_field: first_name
output:
  - uses: redis.write
    with:
      connection: target
      key:
        expression: concat(['emp:fname:',fname,':lname:',lname])
        language: jmespath

Deploy configuration

In order to deploy your jobs to the remote Write-behind database, run:

redis-di deploy

Deploy configuration on Kubernetes

If the Write-behind CLI is deployed as a pod in a Kubernetes cluster, perform these steps to deploy your jobs:

  • Create a ConfigMap from the YAML files in your jobs folder:

    kubectl create configmap redis-di-jobs --from-file=jobs/
    
  • Deploy your jobs:

    kubectl exec -it pod/redis-di-cli -- redis-di deploy
    

Note: A delay occurs between creating/modifying the ConfigMap and its availability in the redis-di-cli pod. Wait around 30 seconds before running the redis-di deploy command.

You have two options to update the ConfigMap:

  • For smaller changes, you can edit the ConfigMap directly with this command:

    kubectl edit configmap redis-di-jobs
    
  • For bigger changes such as adding another job file, edit the files in your local jobs folder and then run this command:

    kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f -
    

Note: You need to run kubectl exec -it pod/redis-di-cli -- redis-di deploy after updating the ConfigMap with either option.