This page describes how to use Google Cloud Managed Service for Apache Kafka as a source or sink in a Dataflow pipeline.
You can use either of the following approaches:
Requirements
Enable the Cloud Storage, Dataflow, and Managed Service for Apache Kafka APIs in your project. See Enabling APIs or run the following Google Cloud CLI command:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
The Dataflow worker service account must have the Managed Kafka Client (
roles/managedkafka.client
) Identity and Access Management (IAM) role.The Dataflow worker VMs must have network access to the Kafka bootstrap server. For more information, see Configure Managed Service for Apache Kafka networking.
Get the bootstrap server address
To run a pipeline that connects to a Managed Service for Apache Kafka cluster, first get the cluster's bootstrap server address. You need this address when you configure the pipeline.
You can use the Google Cloud console or the Google Cloud CLI, as follows:
Console
In the Google Cloud console, go to the Clusters page.
Click the cluster name.
Click the Configurations tab.
Copy the bootstrap server address from Bootstrap URL.
gcloud
Use the managed-kafka clusters describe
command.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Replace the following:
- CLUSTER_ID: the ID or name of the cluster
- LOCATION: the location of the cluster
For more information, see View a Managed Service for Apache Kafka cluster.
Use Managed Service for Apache Kafka with a Dataflow template
Google provides several Dataflow templates that read from Apache Kafka:
These templates can be used with Managed Service for Apache Kafka. If one of them matches your use case, consider using it rather than writing custom pipeline code.
Console
Go to the Dataflow > Jobs page.
Click Create job from template.
In the Job name, enter a name for the job.
From the Dataflow template drop-down menu, select the template to run.
In the Kafka bootstrap server box, enter the bootstrap server address.
In the Kafka topic box, enter the name of the topic.
For Kafka authentication mode, select APPLICATION_DEFAULT_CREDENTIALS.
For Kafka message format, select the format of the Apache Kafka messages.
Enter other parameters as needed. The supported parameters are documented for each template.
Run job.
gcloud
Use the
gcloud dataflow jobs run
command.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Replace the following:
- JOB_NAME: a name for the job
- TEMPLATE_FILE: the location of the template file in Cloud Storage
- REGION_NAME: the region where you want to deploy your job
- PROJECT_NAME: the name of your Google Cloud project
- LOCATION: the location of the cluster
- CLUSTER_ID: the ID or name of the cluster
- TOPIC: the name of the Kafka topic
Use Managed Service for Apache Kafka with a Beam pipeline
This section describes how to use the Apache Beam SDK to create and run a Dataflow pipeline that connects to Managed Service for Apache Kafka.
For most scenarios, use the
managed I/O transform as your
Kafka source or sink. If you need more advanced performance
tuning, consider using the KafkaIO
connector.
For more information about the benefits of using managed I/O, see
Dataflow managed I/O.
Requirements
Kafka Client version 3.6.0 or later.
Apache Beam SDK version 2.61.0 or later.
The machine where you start the Dataflow job must have network access to the Apache Kafka bootstrap server. For example, start the job from a Compute Engine instance that can access the VPC where the cluster is reachable.
The principal that creates the job must have the following IAM roles:
- Managed Kafka Client (
roles/managedkafka.client
) to access the Apache Kafka cluster. - Service Account User (
roles/iam.serviceAccountUser
) to act as the Dataflow worker service account. - Storage Admin (
roles/storage.admin
) to upload job files to Cloud Storage. - Dataflow Admin (
roles/dataflow.admin
) to create the job.
If you start the job from a Compute Engine instance, you can grant these roles to a service account that is attached to the VM. For more information, see Create a VM that uses a user-managed service account.
You can also use Application Default Credentials (ADC) with service account impersonation when you create the job.
- Managed Kafka Client (
Configure Managed I/O
If your pipeline uses Managed I/O for Apache Kafka, set the following configuration options to authenticate with Managed Service for Apache Kafka:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
The follow examples show how to configure managed I/O for Managed Service for Apache Kafka:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Configure the KafkaIO
connector
The follow examples show how to configure the KafkaIO
connector for
Managed Service for Apache Kafka:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
What's next
- Learn more about Managed Service for Apache Kafka.
- Write data from Managed Service for Apache Kafka to BigQuery
- Read from Apache Kafka to Dataflow.
- Write from Dataflow to Apache Kafka.