Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Publish task fails when message contains orderingKey #348

Open
shrutimantri opened this issue Mar 23, 2024 · 0 comments
Open

PubSub Publish task fails when message contains orderingKey #348

shrutimantri opened this issue Mar 23, 2024 · 0 comments
Labels
area/plugin Plugin-related issue or feature request bug Something isn't working
Milestone

Comments

@shrutimantri
Copy link
Contributor

Expected Behavior

Message has an attribute orderingKey. Messages with orderingKey should be published.

Actual Behaviour

If the task has messages with orderingKey, the execution fails. Here is the error log:

2024-03-23 12:31:27.869 Cannot publish a message with an ordering key when message ordering is not enabled in the Publisher client. Please create a Publisher client with setEnableMessageOrdering(true) in the builder.
2024-03-23 12:31:27.869 java.lang.IllegalStateException: Cannot publish a message with an ordering key when message ordering is not enabled in the Publisher client. Please create a Publisher client with setEnableMessageOrdering(true) in the builder.
	at com.google.common.base.Preconditions.checkState(Preconditions.java:512)
	at com.google.cloud.pubsub.v1.Publisher.publish(Publisher.java:255)
	at io.kestra.plugin.gcp.pubsub.Publish.lambda$buildFlowable$1(Publish.java:116)
	at io.kestra.core.utils.Rethrow.lambda$throwFunction$4(Rethrow.java:90)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	at reactor.core.publisher.MonoReduce$ReduceSubscriber.request(MonoReduce.java:222)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:53)
	at reactor.core.publisher.MonoReduce$ReduceSubscriber.onSubscribe(MonoReduce.java:98)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.block(Mono.java:1727)
	at io.kestra.plugin.gcp.pubsub.Publish.run(Publish.java:97)
	at io.kestra.plugin.gcp.pubsub.Publish.run(Publish.java:28)
	at io.kestra.core.runners.Worker$WorkerThread.run(Worker.java:710)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)
		at reactor.core.publisher.Mono.block(Mono.java:1728)
		... 3 more

Looks like we should enableMessageOrdering on the publisher when the messages contain orderingKey (or we can by default set it to enabled).

Steps To Reproduce

  1. Take the example flow, and set correct values (ensure orderingKey is set for atleast one of the messages).
  2. Execute the flow.

Environment Information

  • Kestra Version: 0.15.5
  • Plugin version: 0.15.5
  • Operating System (OS / Docker / Kubernetes): Docker
  • Java Version (If not docker): N/A

Example flow

id: pubsub-publish
namespace: dev
tasks:
  - id: "publish"
    type: "io.kestra.plugin.gcp.pubsub.Publish"
    projectId: <project-id>
    serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}"
    topic: smantri-test
    from:
    -  data: "{{ 'base64-encoded-string-1' | base64encode }}"
       attributes:
           testAttribute: KestraTest
       messageId: '1234'
       orderingKey: 'foo'
    -  data: "{{ 'base64-encoded-string-2' | base64encode }}"
    -  attributes:
           testAttribute: KestraTest
@shrutimantri shrutimantri added the bug Something isn't working label Mar 23, 2024
@anna-geller anna-geller added this to the v0.17.0 milestone Mar 25, 2024
@anna-geller anna-geller modified the milestones: v0.17.0, v0.20.0 Apr 22, 2024
@Ben8t Ben8t added the area/plugin Plugin-related issue or feature request label Oct 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/plugin Plugin-related issue or feature request bug Something isn't working
Projects
Status: Backlog
Development

No branches or pull requests

3 participants