Skip to content

Conversation

@ozeranskii
Copy link
Contributor

@ozeranskii ozeranskii commented Oct 15, 2025

Description

This PR adds support for per-message keys in Kafka/Confluent batch publishing operations. Previously, batch publishers didn't support keys at all. Now users can specify keys in three ways:

  1. Per-message keys using KafkaPublishMessage (new alias for KafkaResponse)
  2. Single key for entire batch via publish() parameter
  3. Default key from publisher config (factory-level configuration)

Code Examples

1. Per-message keys using KafkaPublishMessage

from faststream.kafka import KafkaBroker, KafkaPublishMessage

broker = KafkaBroker()

@broker.subscriber("input")
async def handler():
    await broker.publish_batch(
        KafkaPublishMessage("user:1", key=b"user1"),
        KafkaPublishMessage("user:2", key=b"user2"),
        "user:3",  # Uses default key (None)
        topic="output"
    )

2. Single key for entire batch

publisher = broker.publisher("topic", batch=True)

await publisher.publish(
    "message1",
    "message2",
    "message3",
    key=b"partition_key"  # All messages get same key
)

3. Default key from factory config

publisher = broker.publisher(
    "topic",
    batch=True,
    key=b"default_key"  # Applied to all messages by default
)

await publisher.publish("msg1", "msg2")  # Both get default_key

# Override default for specific messages
await publisher.publish(
    KafkaPublishMessage("msg1", key=b"override"),
    "msg2"  # Uses default_key
)

Backward Compatibility

Fully backward compatible - all existing code continues to work:

  • key parameter is optional (defaults to None)
  • Existing batch publishers without keys work unchanged

Fixes #2514

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (just lint shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running just test-coverage
  • I have ensured that static analysis tests are passing by running just static-analysis
  • I have included code examples to illustrate the modifications

@ozeranskii ozeranskii requested a review from Lancetnik as a code owner October 15, 2025 10:55
@github-actions github-actions bot added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module labels Oct 15, 2025
Copy link
Member

@Lancetnik Lancetnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but magic {"message": ..., "key": ... } doesn't look as a good API.
One reason - we may want to add more additional information to each method instead of just key (headers, publish timestamp, etc)

This reason we have a feature already - Response (or KafkaResponse)

@broker.subscriber(...)
@broker.publisher(...)
async def handler():
     return Response("body", key=b"key")

This object allows to setup any information of outgoing message you want

So, I suggest to support smth like this Response object - Message (we can discuss naming)

await broker.publish_batch(KafkaMessage("body", key=b"1"), KafkaMessage("body", key=b"2"), "just a body message")

Probably, we can reuse the KafkaMessage class we have already. Also, please take a look at KafkaPublishCommand class

@ozeranskii ozeranskii requested a review from Lancetnik October 30, 2025 00:01
@ozeranskii
Copy link
Contributor Author

Sorry, but magic {"message": ..., "key": ... } doesn't look as a good API. One reason - we may want to add more additional information to each method instead of just key (headers, publish timestamp, etc)

This reason we have a feature already - Response (or KafkaResponse)

@broker.subscriber(...)
@broker.publisher(...)
async def handler():
     return Response("body", key=b"key")

This object allows to setup any information of outgoing message you want

So, I suggest to support smth like this Response object - Message (we can discuss naming)

await broker.publish_batch(KafkaMessage("body", key=b"1"), KafkaMessage("body", key=b"2"), "just a body message")

Probably, we can reuse the KafkaMessage class we have already. Also, please take a look at KafkaPublishCommand class

I agree, I didn't like it either. I fixed it.

@ozeranskii ozeranskii force-pushed the 2514-set-the-key-for-batch-producing branch 4 times, most recently from 49cb334 to 96a7dde Compare November 2, 2025 19:36
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Nov 2, 2025
@ozeranskii ozeranskii force-pushed the 2514-set-the-key-for-batch-producing branch from 8274577 to 07762e9 Compare November 2, 2025 19:39
@ozeranskii ozeranskii force-pushed the 2514-set-the-key-for-batch-producing branch from 07762e9 to 29a49b6 Compare November 2, 2025 19:41
@Lancetnik Lancetnik added this pull request to the merge queue Nov 11, 2025
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Nov 11, 2025
@Lancetnik Lancetnik enabled auto-merge November 11, 2025 09:05
@Lancetnik Lancetnik added this pull request to the merge queue Nov 11, 2025
Merged via the queue into ag2ai:main with commit cba7f34 Nov 11, 2025
29 checks passed
@ozeranskii ozeranskii deleted the 2514-set-the-key-for-batch-producing branch December 11, 2025 11:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AioKafka Issues related to `faststream.kafka` module Confluent Issues related to `faststream.confluent` module documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Set the key for batch producing

3 participants