Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 53][PulsarSpout] Adds DLQ support in PulsarSpout of the Storm adapter #60

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

AnuragReddy2000
Copy link

@AnuragReddy2000 AnuragReddy2000 commented Feb 6, 2025

Fixes #53

Motivation

The current implementation of the Storm adapter does not utilise the dead letter queue functionality of Pulsar. Often it may be desired that when there is some error while processing a message in the Storm topology, the message be moved into the dead letter queue so that it can be handled separately. But the PulsarSpout currently keeps retrying for a while & finally just drops the message by acking it, which leads to data loss. This PR seeks to allow users of PulsarSpout to opt in to using DLQ queues for message processing failures.

Modifications

  1. Added a method negativeAcknowledge to the PulsarSpoutConsumer interface & added the implementation for the same in the SpoutConsumer class.
  2. Added a boolean attribute negativeAckFailedMessages in the PulsarSpoutConfiguration class along with getter & setters for the same.
  3. Added a method called negativeAck in the PulsarSpout class to negatively ack messages and send them to the DLQ.
  4. Modified the fail method in the PulsarSpout class to negatively acknowledge messages when negativeAckFailedMessages is set to true
  5. Modified the mapToValueAndEmit method to throw an exception when mapping to the value fails.
  6. Updated the version of Pulsar client to 4.0.2. This is required for this fix as mentioned in the following PR [fix][client] Make DeadLetterPolicy & KeySharedPolicy serializable pulsar#23718

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

Tested the cases where negativeAckFailedMessages is set to true and false in the PulsarSpoutTest test class.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API: yes
  • The schema: don't know
  • The default values of configurations: no
  • The wire protocol: no
  • The rest endpoints: no
  • The admin cli options: bo
  • Anything that affects deployment: don't know

Documentation

  • Does this pull request introduce a new feature? - yes
  • If yes, how is the feature documented? - I couldn't find the documentation for the pulsar storm adaptor. But I have however, added the javadoc comments to explain the changes wherever necessary.

The corresponding PR in my fork: AnuragReddy2000#2

@AnuragReddy2000 AnuragReddy2000 changed the title [Issue 53][PulsarSpout] Adds DLQ support in PulsarSpout of the Storm adaptor [Issue 53][PulsarSpout] Adds DLQ support in PulsarSpout of the Storm adapter Feb 15, 2025
Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. minor comment

@@ -159,6 +159,21 @@ public void ack(Object msgId) {
}
}

public void negativeAck(Object msgId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be rename as msg

Copy link
Author

@AnuragReddy2000 AnuragReddy2000 Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just following the same approach used in the ack and fail methods in the same class, where the method argument is called msgId and has the type Object. But inside the method, there is a check on whether msgId is an instance of the Message class and then is type cast into the Message type and assigned to a variable called msg.

Do you still want me to rename the argument to some other name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia Any comments on this?

Values values;
try{
values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
} catch (Exception e){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change doesn't seem related to this PR? did we see any issue with the mapper failure? can we create a separate PR with the details if you have come across with such issue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change is not related to this PR. Let me remove it here and create a separate one for it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DeadLetterPolicy in Apache Pulsar Storm Adapter Not implemented
2 participants