Kafka Connect ExtractTopic SMT Usage Reference for Confluent Cloud
The following provides usage information for the Confluent SMT io.confluent.connect.transforms.ExtractTopic.
Description
Extract data from a message and use it as the topic name. You can either use the entire key/value (which should be a string), or use a field from a map or struct.
Use the concrete transformation type designed for the record key (io.confluent.connect.transforms.ExtractTopic$Key) or value (io.confluent.connect.transforms.ExtractTopic$Value).
You can also extract the entire value from a message header value (string) by using the concrete type (io.confluent.connect.transforms.ExtractTopic$Header).
Installation
This transformation is developed by Confluent and does not ship by default with Apache Kafka® or Confluent Cloud. You can install this transformation using the confluent connect plugin install command:
confluent connect plugin install confluentinc/connect-transforms:latest
Examples
The configuration snippet below shows how to use and configure the
ExtractTopic SMT.
"transforms": "KeyExample", "ValueFieldExample", "KeyFieldExample", "FieldJsonPathExample", "HeaderExample",
Use the key of the message as the topic name.
"transforms.KeyExample.type": "io.confluent.connect.transforms.ExtractTopic$Key"
Extract a required field named f2 from the value, and use it as the topic name.
"transforms.ValueFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ValueFieldExample.field": "f2"
Extract a field named f3 from the key, and use it as the topic name. If the field is null or missing, leave the topic name as-is.
"transforms.KeyFieldExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.KeyFieldExample.field": "f3",
"transforms.KeyFieldExample.skip.missing.or.null": "true"
Extract the value of a field named f3 in the f1 field in the key, and use it as the topic name.
Here the format of the field is defined with JSON Path (e.g., ["f1"]["f3"]). If the field is null or missing, leave the topic name as-is.
"transforms.FieldJsonPathExample.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.FieldJsonPathExample.field": "$["f1"]["f3"]",
"transforms.FieldJsonPathExample.field.format": "JSON_PATH",
"transforms.FieldJsonPathExample.skip.missing.or.null": "true"
Extract the value of a message header (as a string) with key h1 (required) and use it as the topic name.
"transforms.HeaderExample.type": "io.confluent.connect.transforms.ExtractTopic$Header",
"transforms.HeaderExample.field": "h1",
"transforms.FieldJsonPathExample.skip.missing.or.null=true"
Tip
For additional examples, see ExtractTopic for managed connectors.
Properties
Name |
Description |
Type |
Default |
Valid Values |
Importance |
|---|---|---|---|---|---|
|
Field name to use as the topic name. If left blank, the entire key or value is used (and assumed to be a string). |
string |
“” |
medium |
|
|
Specify field path format. Currently two formats are supported: JSON_PATH and PLAIN. If set to JSON_PATH, the transformer will interpret the field with JSON path interpreter, which supports nested field extraction. If left blank or set to PLAIN, the transformer will evaluate the field config as a non-nested field name. When using |
string |
“PLAIN” |
“JSON_PATH”, “PLAIN” |
medium |
|
How to handle missing fields and null fields, keys, and values. By default, this transformation will throw an exception if a field defined in the |
boolean |
|
low |
Predicates
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.