-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-4230: Enhance listener observation handling with additional adapter support #4238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Flamme1004K <[email protected]>
Signed-off-by: Flamme1004K <[email protected]>
|
I did something very similar, with a few differences: #4240 |
|
@victorpasqualino good Idea! I got inspired by your code and tried refactoring it. Can you take a look? |
Signed-off-by: Flamme1004K <[email protected]>
Looks good to me. Thank you |
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, sure you run /gradlew check locally before pushing.
Thanks
| * @author Soby Chacko | ||
| * @since 3.2.7 | ||
| * @author Hyoungjune Kim | ||
| * @since 4.0.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct.
The class was created really in that version, so you cannot modify it.
Plus, the version you target is still not correct since I fill like we need to back-port the fix to 3.3.x as well.
Therefore, if we do @since, we should aim for the lowest version we are going to back-port to.
Luckily, we don't need to use @since anywhere in your change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I rollbacked @since
| } | ||
|
|
||
| private MessageListener<?, ?> unwrapDelegateIfAny(MessageListener<?, ?> listener) { | ||
| if (listener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?> backoffAware) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KafkaBackoffAwareMessageListenerAdapter is an AbstractDelegatingMessageListenerAdapter, so I think any of those should be covered here.
If we go this method, it has to be static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good! I added static to this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good. No need to add a comment with a confirmation.
I'll see an agreement in the next commit you push.
How about an AbstractDelegatingMessageListenerAdapter support?
You didn't comment anything to contradict my suggestion and didn't implement it.
With the 👍 it give a false impression that all the request have been addressed.
Any comments, please?
And another our code style suggestion: the static methods should go in the end of class.
Not a big deal: just an info if you'd like to know more about our developing process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was such a good suggestion I ended up hitting the emoji button without thinking. Next time I'll make sure to use them appropriately.
I've been thinking about it, and instead of using AbstractDelegatingMessageListenerAdapter, how about checking if it's an instance of DelegatingMessageListener and extracting it if it is? What do you think?
…iner Signed-off-by: Flamme1004K <[email protected]>
Signed-off-by: Flamme1004K <[email protected]>
…icsTests` Signed-off-by: Flamme1004K <[email protected]>
Signed-off-by: Flamme1004K <[email protected]>
|
@artembilan Thank you, Code Review! I run |
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the update!
I won't mind any objections for my review.
| } | ||
|
|
||
| static class RetryBackOffObservationListener { | ||
| final CountDownLatch latch = new CountDownLatch(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every member in the class (including before the last } of the last inner class) has to be surrounded with blank lines.
That makes the code much easier to read.
And we do here an Open Source, so our code should be as clean as possible and as a good example for community.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm learning about open source. Thank you.
|
|
||
| private void setupObservationRegistry(ObservationRegistry observationRegistry) { | ||
| if (this.listener == null) { | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for better void digesting it would be better to make it opposite: use if (this.listener != null) { instead around the rest of the code in this method.
| } | ||
|
|
||
| private MessageListener<?, ?> unwrapDelegateIfAny(MessageListener<?, ?> listener) { | ||
| if (listener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?> backoffAware) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good. No need to add a comment with a confirmation.
I'll see an agreement in the next commit you push.
How about an AbstractDelegatingMessageListenerAdapter support?
You didn't comment anything to contradict my suggestion and didn't implement it.
With the 👍 it give a false impression that all the request have been addressed.
Any comments, please?
And another our code style suggestion: the static methods should go in the end of class.
Not a big deal: just an info if you'd like to know more about our developing process.
| // NOTHING | ||
| } | ||
| finally { | ||
| latch.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need any logic here since in the end you still fall back to the await().untilAsserted().
Therefore, such a latch is just redundant barrier which really does not contribute to the async flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During local testing, intermittent failures in the test code were observed, so the source code was modified to call latch.countDown().
The reason is that the count for spring.kafka.listener.id can only be obtained after messages are consumed by the listener. If verification occurs before consumption, a null pointer exception occurs regardless of untilAsserted.
Signed-off-by: Flamme1004K <[email protected]>
Signed-off-by: Flamme1004K <[email protected]>
…delegates Signed-off-by: Flamme1004K <[email protected]>
|
@artembilan Thank you for your prompt feedback on the code review. I've addressed the feedback you provided. Please review it. |
| private void setupObservationRegistry(ObservationRegistry observationRegistry) { | ||
| if (this.listener != null) { | ||
| MessageListener<?, ?> target = unwrapDelegateIfAny(this.listener); | ||
| if (target instanceof RecordMessagingMessageListenerAdapter<?, ?> adapter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to think about this change twice.
You see, our existing logic is RecordMessagingMessageListenerAdapter.class.equals(this.listener.getClass()).
I believe even if we extract the delegate, we still have to be sure that we talk about exactly the same object.
So, if we do this instanceof, then any RecordMessagingMessageListenerAdapter extension is a valid candidate for isListenerAdapterObservationAware.
But that is really not what we want here.
We want to check exactly only for the RecordMessagingMessageListenerAdapter and nothing more.
In some other places we do have some custom RecordMessagingMessageListenerAdapter , which must not be treated as an observation aware.
Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artembilan I agree. It's correct to change the code to verify it's specifically RecordMessagingMessageListenerAdapter.class.
I will modify the code to RecordMessagingMessageListenerAdapter.class.equals(this.listener.getClass()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code!
…st synchronization Signed-off-by: Flamme1004K <[email protected]>
…for listener adapter handling Signed-off-by: Flamme1004K <[email protected]>
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
Will merge when build is green.
Thank you!
|
thank you for contribution; looking forward for more! |
|
@artembilan |
I'm submitting a PR for issue #4230. Although the KafkaBackoffAwareMessageListenerAdapter class delegates to RecordMessagingMessageListenerAdapter, it was observed that metrics were being recorded incorrectly because it wasn't adding metrics despite RecordMessagingMessageListenerAdapter.
To resolve this, I modified the code to set
isListenerAdapterObservationAwaretotrueeven when it's a KafkaBackoffAwareMessageListenerAdapter. Additionally, when adding observations, I ensured that if the KafkaBackoffAwareMessageListenerAdapter class delegates to RecordMessagingMessageListenerAdapter, it also adds the metrics.Thank you for your attention.