Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DeadLetterPolicy in Apache Pulsar Storm Adapter Not implemented #53

Open
karunamgoyal opened this issue Sep 8, 2023 · 0 comments · May be fixed by AnuragReddy2000/pulsar-adapters#1
Open

Comments

@karunamgoyal
Copy link

karunamgoyal commented Sep 8, 2023

Describe the bug
v2.11.0
There is no way to negatively acknowledge the consumer and the registry method for DeadLetterPolicy is broken

ConsumerConfigurationData<byte[]> subscriptionConfig = new ConsumerConfigurationData<>(); subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); subscriptionConfig.setSubscriptionType(SubscriptionType.Shared); subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder() .deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());
PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration, ((ClientBuilderImpl) createBuilder(viestiSourceConfig)) .getClientConfigurationData() .clone(), subscriptionConfig);

This above code doesnt stick while creating PulsarSpout.

`
static class SpoutConsumer implements PulsarSpoutConsumer {
private Consumer<byte[]> consumer;

    public SpoutConsumer(Consumer<byte[]> consumer) {
        this.consumer = consumer;
    }

    public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
        return this.consumer.receive(timeout, unit);
    }

    public void acknowledgeAsync(Message<?> msg) {
        this.consumer.acknowledgeAsync(msg);
    }

    public void close() throws PulsarClientException {
        this.consumer.close();
    }

    public void unsubscribe() throws PulsarClientException {
        this.consumer.unsubscribe();
    }
}

`

Also there is no Mechanism to negativelyAcknowledge a message.

Expected behavior

While Setting DeadletterPolicy It should not drop it while serialising. Negative Acks support should be there

Desktop (please complete the following information):
MacOs Ventura 13.4.1
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
Apache Storm 2.2.1

Trying to consume from Pulsar Topic in Apache Storm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant