Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream source not working as expected. #14

Closed
DylanNWatt opened this issue Nov 25, 2024 · 7 comments
Closed

JetStream source not working as expected. #14

DylanNWatt opened this issue Nov 25, 2024 · 7 comments

Comments

@DylanNWatt
Copy link

DylanNWatt commented Nov 25, 2024

Observed behavior

NatsJetStreamSourceBuilder does not appear to process messages as expected.

I'm investigating this library to process bluesky firehose messages I'm ingesting to jetstream. I want to use flink to do real time analysis of those messages. I want to use jetstream to ensure that messages are all properly captured and processed durably. I then sink the messages to an iceberg table.

Using a "NatsSourceBuilder" works (messages sink to iceberg), but not jetstream. In the following, source1 works, but not source2.

// Emits events
        NatsSource<String> source1 = new NatsSourceBuilder<String>()
            .subjects("app.bsky.feed.post")
            .connectionPropertiesFile("connection.properties")
            .payloadDeserializer(new StringPayloadDeserializer())
            .build();

// Emits no events
        NatsSource<String> source2 = new NatsJetStreamSourceBuilder<String>()
            .subjects("app.bsky.feed.post")
            .connectionPropertiesFile("connection.properties")
            .consumerName("bluesky-firehose")
            .payloadDeserializer(new StringPayloadDeserializer())
            .build();

**Note I'm able to reproduce the lack of processing in the examples recently checked into this repo, and the below "behavior" questions refer to it, as that's a more easily isolated behavior.

I'm happy to provide more information about the "real" setup if needed. My java's pretty rusty, so i wasn't able to step through and see what was breaking.

Expected behavior

When running nats subscribe example-sink while executing src/examples/java/io/synadia/flink/examples/SourceToSinkJsExample.java, I expect to see published messages, however there are none. Note that nats subscribe example-source does show the created messages.

Server and client version

Nats server: 2.10.22
Nats client: 0.1.5

Host environment

Mac 15.0.1, running nats in Kubernetes via helm.

Steps to reproduce

Clone this repo and run src/examples/java/io/synadia/flink/examples/SourceToSinkJsExample.java while having nats subscribe example-sink in another tab, and see no messages emitted.

@skyrocknroll
Copy link

@somratdnutanix

@mlornac
Copy link

mlornac commented Dec 9, 2024

I am facing the same problem and can read successfully from a NATS source but not from a Jetstream source.

I can see that the source is closed and wonders if it is treated as bounded.
This override might need to return the unbounded value like the nats source (or be removed)

return null; // TODO this varies from NatsSource, understand why

@scottf
Copy link
Collaborator

scottf commented Dec 9, 2024

@DylanNWatt @skyrocknroll This jetstream code is beta for a reason. It was provided by a 3rd party and seems to work on their very specific use case. I'm personally not satisfied with it's behavior, hence the beta tag and the v0 packaging. I'm in the process of rewriting the connector to make it consumer centric instead of subject centric.

@souravagrawal
Copy link
Contributor

I sincerely apologize for any inconvenience or challenges this issue may have caused. Rest assured, I am actively working on a fix and will submit a PR shortly to address it.
@DylanNWatt @mlornac, thank you for the detailed explanation of the issue—it will greatly assist in reproducing and resolving it effectively.

@souravagrawal
Copy link
Contributor

Hello @scottf can you please review the change once ?
@DylanNWatt @mlornac would it be possible for you to try the fixes made #15 while @scottf review the changes ?

@scottf
Copy link
Collaborator

scottf commented Dec 17, 2024

@scottf @DylanNWatt @mlornac @souravagrawal @skyrocknroll @somratdutta
PR 15 has been merged. Also PR 16 has also been merged. I noticed that the JetStream source classes were packaged wrong, that's on me. Anyway, the 2.0.0-beta2 will be available on Sonatype 2 (see readme) within the hour.

@mlornac
Copy link

mlornac commented Jan 2, 2025

would it be possible for you to try the fixes made #15 while @scottf review the changes ?

I tried the fix and confirm that I can read from a JS source now.
Thanks! 🙌

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants