Kafka Connect HeaderToValue (Debezium) SMT Usage Reference for Confluent Cloud
The following provides usage information for the SMT io.debezium.transforms.HeaderToValue.
Description
The HeaderToValue SMT extracts specified header fields from event records, and then copies or moves these header fields to values in the event record. If configured for move, the SMT ensures these fields are fully eliminated from the header before becoming part of the value payload. It allows for the processing of multiple headers from the source message. This SMT supports dot notation to direct a header field into a specific nested position within the record value.
Example
This configuration snippet shows how to use HeaderToValue to move the event_timestamp and key headers from the event message into the record value, and then mapping them to timestamp and source.id fields respectively. The transform removes the original headers. If you want to keep the headers, use the operation copy instead of move.
"transforms": "moveHeadersToValue",
"transforms.moveHeadersToValue.type": "io.debezium.transforms.HeaderToValue",
"transforms.moveHeadersToValue.headers": "event_timestamp,key",
"transforms.moveHeadersToValue.fields": "timestamp,source.id",
"transforms.moveHeadersToValue.operation": "move"
Before:
Record header
{ "header_x": 0, "event_timestamp": 1962352708742, "key": 90 }
Record value
{ "before": null, "after": { "id": 99, "first_name": "Maria", "last_name": "Cena", "email": "[email protected]" }, "source": { "version": "5.1.0-RC1", "connector": "mongodb-atlas", "name": "OrderProcessingService", "ts_ms": 1752069400000, "ts_us": 1752069400000876, "ts_ns": 1752069400000876543, "snapshot": true, "db": "ecommerce", "sequence": "[\"78901234\",\"78901243\"]", "schema": "public", "table": "inventory_updates", "txId": 1234, "lsn": 78901243, "xmin": null }, "op": "c", "ts_ms": 1752069400500, "ts_us": 1752069400500987, "ts_ns": 1752069400500987654 }
After:
Record header
{ "header_x": 0 }
Record value
{ "before": null, "after": { "id": 99, "first_name": "Maria", "last_name": "Cena", "email": "[email protected]" }, "source": { "version": "5.1.0-RC1", "connector": "postgresql", "name": "mongodb-atlas", "ts_ms": 1752069400000, "ts_us": 1752069400000876, "ts_ns": 1752069400000876543, "snapshot": true, "db": "ecommerce", "sequence": "[\"78901234\",\"78901243\"]", "schema": "public", "table": "inventory_updates", "txId": 1234, "lsn": 78901243, "xmin": null, "id": 90 }, "op": "c", "ts_ms": 1752069400500, "ts_us": 1752069400500987, "ts_ns": 1752069400500987654, "timestamp": 1962352708742 }
Properties
Name | Description | Type | Default | Valid Values | Importance |
|---|---|---|---|---|---|
| A comma-separated list of header names in the record whose values are to be copied or moved to the record value. | string | None | non-empty list | high |
| A comma-separated list of field names, in the same order as the header names listed in the | list | None | non-empty list | high |
| Specifies one of the following options: | string | None |
| high |
Predicates
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.