-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Milestone
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.1.1
Describe the bug
The KafkaMessageListenerContainer doesn't clear the threadState of AfterRollbackProcessor after a successfull processing attempt if transactionManager is set.
To Reproduce
- I have a
KafkaMessageListenerContainerconfigured with aBatchMessageListenerandKafkaTransactionManager(batched exactly-once processing) - Additionally i set a
DefaultAfterRollbackProcessorwithExponentialBackOff - i do not set any CommonErrorHandler (
AbstractMessageListenerContainer#setCommonErrorHandler)
Example snippet:
@Bean
MessageListenerContainer messageListenerContainer(
KafkaTemplate<byte[], byte[]> kafkaTemplate,
ConsumerFactory<byte[], byte[]> consumerFactory,
KafkaTransactionManager<byte[], byte[]> transactionManager) {
// initialize MyEventListener which is a BatchMessageListener
var myEventListener = new MyEventListener(kafkaTemplate);
// initialize container properties
var props = new ContainerProperties(sourceTopics);
props.setMessageListener(myEventListener);
props.setTransactionManager(transactionManager); // enable transaction manager for exactly-once processing
// initialize back-off strategy for retries
var backOff = new ExponentialBackOff(1000, 2);
backOff.setMaxInterval(300000L); // set back-off limit to 5min
var afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(backOff);
// initialize MessageListenerContainer
var container = new KafkaMessageListenerContainer<>(consumerFactory, props);
container.setAfterRollbackProcessor(afterRollbackProcessor);
return container;
}(Un-)expected behavior
KafkaMessageListenerContainercalls theAfterRollbackProcessorafter each failed transacation (as expected)- the
DefaultAfterRollbackProcessoruses the configuredBackOffafter each failed transacttion (as expected) - after a sucessfull processing attempt,
AfterRollbackProcessor#clearThreadStateis not being called (that's unexpected to me) - thus the state of the
BackOffExecutioninDefaultAfterRollbackProcessor(which is "some kind ofThreadLocalstate") is never reset (that's unexpected to me) - thus, on subsequent transaction rollbacks,
DefaultAfterRollbackProcessorreuses the previously usedBackOffExecution(that's unexpected to me) - this unexpected behavior only appears if
commonErrorHandleris set tonull(that's the default iftransactionManageris set)