Kafka
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

Kafka Pipeline Connector #

The Kafka Pipeline connector can be used as the Data Sink of the pipeline, and write data to Kafka. This document describes how to set up the Kafka Pipeline connector.

What can the connector do? #

  • Data synchronization

How to create Pipeline #

The pipeline for reading data from MySQL and sink to Kafka can be defined as follows:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://localhost:62510

pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 2

Pipeline Connector Options #

Option Required Default Type Description
type required (none) String Specify what connector to use, here should be 'kafka'.
name optional (none) String The name of the sink.
partition.strategy optional (none) String Defines the strategy for sending record to kafka topic, available options are `all-to-zero`(sending all records to 0 partition) and `hash-by-key`(distributing all records by hash of primary keys), default option is `all-to-zero`.
key.format optional (none) String Defines the format identifier for encoding key data, available options are `csv` and `json`, default option is `json`.
value.format optional (none) String The format used to serialize the value part of Kafka messages. Available options are debezium-json and canal-json, default option is `debezium-json`, and do not support user-defined format now.
properties.bootstrap.servers required (none) String A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
topic optional (none) String If this parameter is configured, all events will be sent to this topic.
sink.add-tableId-to-header-enabled optional (none) Boolean If this parameter is true, a header with key of 'namespace','schemaName','tableName' will be added for each Kafka record. Default value is false.
properties.* optional (none) String Pass options of Kafka table to pipeline,See Kafka consume options.
sink.custom-header optional (none) String custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'.
sink.tableId-to-topic.mapping optional (none) String Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`.
debezium-json.include-schema.enabled optional false Boolean If this parameter is configured, each debezium record will contain debezium schema information. Is only supported when using debezium-json.

Usage Notes #

  • The written topic of Kafka will be namespace.schemaName.tableName string of TableId,this can be changed using route function of pipeline.

  • If the written topic of Kafka is not existed, we will create one automatically.

Output Format #

For different built-in value.format options, the output format is different:

debezium-json #

Refer to Debezium docs, debezium-json format will contains before,after,op,source elements, but ts_ms is not included in source.
An output example is:

{
  "before": null,
  "after": {
    "col1": "1",
    "col2": "1"
  },
  "op": "c",
  "source": {
    "db": "default_namespace",
    "table": "table1"
  }
}

When debezium-json.include-schema.enabled is true, the output format will be:

{
  "schema":{
    "type":"struct",
    "fields":[
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":true,
            "field":"col1"
          },
          {
            "type":"string",
            "optional":true,
            "field":"col2"
          }
        ],
        "optional":true,
        "field":"before"
      },
      {
        "type":"struct",
        "fields":[
          {
            "type":"string",
            "optional":true,
            "field":"col1"
          },
          {
            "type":"string",
            "optional":true,
            "field":"col2"
          }
        ],
        "optional":true,
        "field":"after"
      }
    ],
    "optional":false
  },
  "payload":{
    "before": null,
    "after": {
      "col1": "1",
      "col2": "1"
    },
    "op": "c",
    "source": {
      "db": "default_namespace",
      "table": "table1"
    }
  }
}

canal-json #

Refer to Canal | Apache Flink, canal-json format will contains old,data,type,database,table,pkNames elements, but ts is not included.
An output example is:

{
    "old": null,
    "data": [
        {
            "col1": "1",
            "col2": "1"
        }
    ],
    "type": "INSERT",
    "database": "default_schema",
    "table": "table1",
    "pkNames": [
        "col1"
    ]
}

Data Type Mapping #

Literal type: defines the physical storage format of data (type field of the debezium schema)
Semantic type: defines the logical meaning of data (name field of the debezium schema).

CDC type JSON type Literal type Semantic type NOTE
TINYINT TINYINT INT16
SMALLINT SMALLINT INT16
INT INT INT32
BIGINT BIGINT INT64
FLOAT FLOAT FLOAT
DOUBLE DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s) BYTES org.apache.kafka.connect.data.Decimal
BOOLEAN BOOLEAN BOOLEAN
DATE DATE INT32 io.debezium.time.Date
TIMESTAMP(p) TIMESTAMP(p) INT64 p <=3 io.debezium.time.Timestamp
p >3 io.debezium.time.MicroTimestamp
TIMESTAMP_LTZ TIMESTAMP_LTZ STRING io.debezium.time.ZonedTimestamp
CHAR(n) CHAR(n) STRING
VARCHAR(n) VARCHAR(n) STRING

Back to top