Configure JavaScript Clients for OAuth/OIDC on Confluent Cloud
Prerequisites
Before you begin, ensure you have the following:
Language and tooling
Node.js 16 or later
confluent-kafka-javascript: 2.0.0 or later
@confluentinc/schemaregistry: Latest version
Confluent Platform: 7.2.1 or later; 7.1.3 or later
Confluent Cloud environment
OAuth setup in Confluent Cloud - Configure OAuth authentication.
Credentials and identifiers
Client ID - Your application’s identifier
Client secret - Your application’s password for OAuth
Token endpoint URL - Where to request access tokens
Scopes - Permissions your application needs (for example,
kafka:read kafka:writefor Kafka,schema-registryfor Schema Registry)Kafka cluster ID (
lkc-xxxxx) - Your Kafka cluster identifier.Schema Registry cluster ID (
lsrc-xxxxx) - Your Schema Registry cluster identifier.Identity pool ID (
pool-xxxxx) - If using identity pools.
Client library
Install required packages:
npm install confluent-kafka-javascript @confluentinc/schemaregistry
Configure Kafka clients
JavaScript Kafka clients use a callback function approach for OAuth token retrieval.
OAuth callback function
The JavaScript client requires an OAuth callback function that handles token requests:
async function oauthCallback(oauthConfig) {
/**
* OAuth callback function for JavaScript Kafka client
*
* @param {Object} oauthConfig - OAuth configuration object
* @returns {string} Access token
*/
const tokenEndpoint = oauthConfig['sasl.oauth.token.endpoint.uri'];
const clientId = oauthConfig['sasl.oauth.client.id'];
const clientSecret = oauthConfig['sasl.oauth.client.secret'];
const scope = oauthConfig['sasl.oauth.scope'] || '';
// Prepare token request
const data = new URLSearchParams({
grant_type: 'client_credentials',
client_id: clientId,
client_secret: clientSecret
});
if (scope) {
data.append('scope', scope);
}
// Make token request
const response = await fetch(tokenEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: data
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const tokenData = await response.json();
return tokenData.access_token;
}
Kafka producer configuration
const { Kafka } = require('confluent-kafka-javascript');
// OAuth Configuration
const oauthConfig = {
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauth.token.endpoint.uri': 'https://your-oauth-provider.com/oauth2/token',
'sasl.oauth.client.id': 'your-client-id',
'sasl.oauth.client.secret': 'your-client-secret',
'sasl.oauth.scope': 'kafka:read kafka:write',
'sasl.oauth.logical.cluster': 'lkc-xxxxx',
'sasl.oauth.identity.pool.id': 'pool-xxxxx',
'oauth_cb': oauthCallback
};
// Kafka Producer Configuration
const producerConfig = {
'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
...oauthConfig
};
// Create producer
const producer = new Kafka.Producer(producerConfig);
Kafka consumer configuration
// Kafka Consumer Configuration
const consumerConfig = {
'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
'group.id': 'your-consumer-group',
'security.protocol': 'SASL_SSL',
...oauthConfig
};
// Create consumer
const consumer = new Kafka.Consumer(consumerConfig);
Configure Schema Registry clients
The Schema Registry client uses a different configuration approach with bearer authentication.
Modern Schema Registry client configuration
Use the modern @confluentinc/schemaregistry package for better TypeScript support:
const { SchemaRegistryClient } = require('@confluentinc/schemaregistry');
// Schema Registry Configuration
const schemaRegistryConfig = {
baseURLs: ['https://psrc-xxxxx.us-west-2.aws.confluent.cloud'],
bearerAuthCredentials: {
credentialsSource: 'OAUTHBEARER',
issuerEndpointUrl: 'https://your-oauth-provider.com/oauth2/token',
clientId: 'your-client-id',
clientSecret: 'your-client-secret',
scope: 'schema-registry',
identityPoolId: 'pool-xxxxx',
logicalCluster: 'lsrc-xxxxx' // Your Schema Registry cluster ID
}
};
// Create Schema Registry client
const schemaRegistry = new SchemaRegistryClient(schemaRegistryConfig);
Complete working example
Here’s a complete example showing how to use both Kafka and Schema Registry clients together:
const { Kafka } = require('confluent-kafka-javascript');
const { SchemaRegistryClient } = require('@confluentinc/schemaregistry');
// --- 1. Define the Kafka OAuth Callback ---
async function oauthCallback(oauthConfig) {
const tokenEndpoint = oauthConfig['sasl.oauth.token.endpoint.uri'];
const clientId = oauthConfig['sasl.oauth.client.id'];
const clientSecret = oauthConfig['sasl.oauth.client.secret'];
const scope = oauthConfig['sasl.oauth.scope'] || '';
const data = new URLSearchParams({
grant_type: 'client_credentials',
client_id: clientId,
client_secret: clientSecret
});
if (scope) {
data.append('scope', scope);
}
const response = await fetch(tokenEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: data
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const tokenData = await response.json();
return tokenData.access_token;
}
// --- 2. Configure the Kafka Client ---
const producerConfig = {
'bootstrap.servers': 'pkc-xxxxx.us-west-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauth.token.endpoint.uri': 'https://your-oauth-provider.com/oauth2/token',
'sasl.oauth.client.id': 'your-client-id',
'sasl.oauth.client.secret': 'your-client-secret',
'sasl.oauth.scope': 'kafka:read kafka:write',
'sasl.oauth.logical.cluster': 'lkc-xxxxx',
'sasl.oauth.identity.pool.id': 'pool-xxxxx',
'oauth_cb': oauthCallback
};
// --- 3. Configure the Schema Registry Client ---
const srConfig = {
baseURLs: ['https://psrc-xxxxx.us-west-2.aws.confluent.cloud'],
bearerAuthCredentials: {
credentialsSource: 'OAUTHBEARER',
issuerEndpointUrl: 'https://your-oauth-provider.com/oauth2/token',
clientId: 'your-client-id',
clientSecret: 'your-client-secret',
scope: 'schema-registry',
identityPoolId: 'pool-xxxxx',
logicalCluster: 'lsrc-xxxxx'
}
};
// --- 4. Instantiate and Use Clients ---
const producer = new Kafka.Producer(producerConfig);
const schemaRegistry = new SchemaRegistryClient(srConfig);
// Example: Get all subjects from Schema Registry
async function getSubjects() {
try {
const subjects = await schemaRegistry.getAllSubjects();
console.log('Subjects:', subjects);
return subjects;
} catch (error) {
console.error('Error fetching subjects:', error);
throw error;
}
}
// Example: Produce a message
async function produceMessage(topic, message) {
try {
producer.produce(topic, null, message, null, Date.now());
producer.flush(10000, () => {
console.log('Message sent successfully!');
});
} catch (error) {
console.error('Error producing message:', error);
throw error;
}
}
Troubleshoot client issues
Common issues and solutions
Authentication failures (401/403)
Problem: OAuth authentication fails with 401 or 403 errors.
Solutions: * Verify all OAuth parameters are correct (client ID, secret, endpoint URL) * Ensure the identity pool ID and cluster IDs are valid * Check that the OAuth provider is properly configured in Confluent Cloud * Verify the scope includes necessary permissions
Content-type header issues
Problem: Authentication fails when manually setting Content-Type headers.
Solution: Remove any manual Content-Type header settings. The clients handle these automatically for OAuth requests.
Cluster ID issues
Problem: Using incorrect cluster IDs.
Solution: * Use the correct Kafka cluster ID (lkc-xxxxx) for Kafka clients * Use the correct Schema Registry cluster ID (lsrc-xxxxx) for Schema Registry clients * Find these IDs in the Confluent Cloud Console
Debug configuration
Enable debug logging to troubleshoot OAuth issues:
// For Kafka client
const debugConfig = {
...producerConfig,
'debug': 'security,protocol,broker'
};
// For Schema Registry client
const srDebugConfig = {
...srConfig,
debug: true
};
Test your configuration
Test Kafka client:
// Test producer const testProducer = new Kafka.Producer(producerConfig); testProducer.produce('test-topic', null, 'test-message', null, Date.now()); testProducer.flush(10000, () => { console.log('Message sent successfully - OAuth is working!'); testProducer.close(); });
Test Schema Registry client:
// Test Schema Registry async function testSchemaRegistry() { try { const subjects = await schemaRegistry.getAllSubjects(); console.log('Schema Registry connection successful!'); console.log('Subjects:', subjects); } catch (error) { console.error('Schema Registry test failed:', error); } }
Best practices
Use environment variables for sensitive configuration like client secrets
Implement proper error handling for authentication failures
Monitor token expiration and implement retry logic
Use the correct cluster IDs for both Kafka and Schema Registry
Avoid manual header manipulation when using OAuth authentication
Test authentication in a development environment before production