Configure Python Clients for OAuth/OIDC on Confluent Cloud
Prerequisites
Before you begin, ensure you have the following:
Language and tooling
Python 3.8 or later
confluent-kafka-python: 2.0.0 or later
Confluent Platform: 7.2.1 or later; 7.1.3 or later
Confluent Cloud environment
OAuth setup in Confluent Cloud - Configure OAuth authentication in your Confluent Cloud environment
Credentials and identifiers
Client ID - Your application’s identifier (like a username)
Client secret - Your application’s password for OAuth
Token endpoint URL - Where to request access tokens
Scopes - Permissions your application needs (for example,
kafka:read kafka:writefor Kafka,schema:readfor Schema Registry)Kafka cluster ID (
lkc-xxxxx) - Your Kafka cluster identifierIdentity pool ID (
pool-xxxxx) - If using identity poolsSchema Registry logical cluster ID (
lsrc-xxxxx) - For Schema Registry operations
Client library
Install the latest version with OAuth support:
pip install confluent-kafka
Configure Kafka Python clients
Python Kafka clients authenticate to Confluent Cloud clusters using the OAuth 2.0 protocol with a callback function approach. The client passes OAuth configuration parameters to your callback function, which handles the token request and returns the access token.
OAuth callback function
The Python client requires an OAuth callback function with robust error handling:
import requests
import json
from confluent_kafka import KafkaException
def oauth_token_refresh_cb(oauth_config):
"""
OAuth callback function for Python Kafka client
Args:
oauth_config: OAuth configuration dictionary
Returns:
tuple: (access_token, expiration_time_ms)
"""
try:
payload = {
'grant_type': 'client_credentials',
'client_id': oauth_config.get('sasl.oauthbearer.client.id'),
'client_secret': oauth_config.get('sasl.oauthbearer.client.secret'),
'scope': oauth_config.get('sasl.oauthbearer.scope')
}
response = requests.post(
oauth_config.get('sasl.oauthbearer.token.endpoint.url'),
data=payload
)
response.raise_for_status()
token = response.json()
# Return the token and its expiration time
return token['access_token'], int(token['expires_in'] * 1000)
except Exception as e:
# Handle exceptions and signal failure
raise KafkaException(f"OAuth token refresh failed: {e}")
Configuration example
Define a single, complete configuration dictionary for your Kafka client:
from confluent_kafka import Producer, Consumer
# Define a single, complete configuration dictionary
producer_config = {
'bootstrap.servers': 'your-bootstrap-server:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url': 'https://<your-idp.com>/oauth2/token',
'sasl.oauthbearer.client.id': '<your-client-id>',
'sasl.oauthbearer.client.secret': '<your-client-secret>',
'sasl.oauthbearer.scope': 'kafka:read kafka:write',
'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
'oauth_cb': oauth_token_refresh_cb,
'debug': 'security,protocol,broker' # For troubleshooting
}
# Create producer
producer = Producer(producer_config)
# For consumer, use the same OAuth configuration
consumer_config = {
'bootstrap.servers': 'your-bootstrap-server:9092',
'group.id': 'your-consumer-group',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url': 'https://<your-idp.com>/oauth2/token',
'sasl.oauthbearer.client.id': '<your-client-id>',
'sasl.oauthbearer.client.secret': '<your-client-secret>',
'sasl.oauthbearer.scope': 'kafka:read kafka:write',
'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
'oauth_cb': oauth_token_refresh_cb,
'debug': 'security,protocol,broker'
}
# Create consumer
consumer = Consumer(consumer_config)
Test your configuration
Test with a simple producer:
# Test producer producer = Producer(producer_config) def delivery_report(err, msg): if err is not None: print(f'Message delivery failed: {err}') else: print(f'Message delivered to {msg.topic()} [{msg.partition()}] - OAuth is working!') producer.produce('test-topic', 'test-message', callback=delivery_report) producer.flush()
Check for common errors:
“SASL authentication failed” - Check your OAuth credentials and endpoint
“Invalid token” - Verify your callback function is returning a valid token
“Connection timeout” - Check your bootstrap servers and network connectivity
Verify token refresh - The client should automatically refresh tokens when they expire
Configure Schema Registry Python clients
The SchemaRegistryClient authenticates using a built-in OAuth mechanism.
You only need to provide the correct configuration parameters, and the client
handles the token request automatically. It does not use the oauth_cb
callback.
Required parameters
The following parameters must be included in the schema.registry.config
dictionary when creating a client.
Parameter |
Description |
|---|---|
|
The endpoint for your Schema Registry instance. |
|
The API key and secret for Schema Registry, in the format |
|
Must be set to ``OAUTHBEARER``. |
|
The client ID from your identity provider. |
|
The client secret from your identity provider. |
|
The token endpoint URL of your identity provider. |
|
The required permissions scope (for example, |
|
The Schema Registry’s logical cluster ID (for example, |
|
The identity pool ID, if using identity federation. |
Configuration example
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.avro import AvroSerializer
schema_registry_conf = {
'url': 'https://<your-schema-registry-endpoint>',
'basic.auth.user.info': '<sr-api-key>:<sr-api-secret>',
# OAuth-specific configuration for Schema Registry client
'bearer.auth.credentials.source': 'OAUTHBEARER',
'bearer.auth.issuer.endpoint.url': 'https://<your-idp.com>/oauth2/token',
'bearer.auth.client.id': '<your-client-id>',
'bearer.auth.client.secret': '<your-client-secret>',
'bearer.auth.scope': 'schema:read',
'bearer.auth.logical.cluster': '<lsrc-xxxxx>',
'bearer.auth.identity.pool.id': '<pool-yyyyy>'
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_serializer = AvroSerializer(schema_registry_client,
user_schema_string,
conf={'auto.register.schemas': False})
Google OIDC integration
For Google OIDC integration with Python clients:
# Google OIDC configuration for Kafka
google_oauth_config = {
'bootstrap.servers': 'your-bootstrap-server:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.token.endpoint.url': 'https://oauth2.googleapis.com/token',
'sasl.oauthbearer.client.id': 'your-google-client-id',
'sasl.oauthbearer.client.secret': 'your-google-client-secret',
'sasl.oauthbearer.scope': 'https://www.googleapis.com/auth/cloud-platform',
'sasl.oauthbearer.extensions': 'logical_cluster=<lkc-xxxxx>,identity_pool_id=<pool-yyyyy>',
'oauth_cb': oauth_token_refresh_cb,
'debug': 'security,protocol,broker'
}
# Create producer with Google OIDC configuration
producer = Producer(google_oauth_config)
Troubleshoot Python OAuth clients
Common issues and solutions for Python OAuth clients:
Authentication failures
Verify client ID and secret are correct
Check token endpoint URL is accessible
Ensure logical cluster ID is valid
Validate identity pool ID if used
Network issues
Confirm network connectivity to OAuth provider
Check firewall rules allow OAuth traffic
Verify SSL certificate validation
Configuration issues
Ensure all required parameters are provided
Validate OAuth callback function signature
Check timeout values are reasonable
Debug logging
Enable debug logging for OAuth troubleshooting by adding the debug parameter
to your configuration:
# Add to your configuration
config = {
# ... your OAuth config
'debug': 'security,protocol,broker'
}
This provides detailed librdkafka logs for authentication issues.