Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Support Reading from Solace message broker #31440

Closed
1 of 16 tasks
bzablocki opened this issue May 29, 2024 · 23 comments
Closed
1 of 16 tasks

[Feature Request]: Support Reading from Solace message broker #31440

bzablocki opened this issue May 29, 2024 · 23 comments

Comments

@bzablocki
Copy link
Contributor

What would you like to happen?

I'd like to add a native Java connector to read messages from Solace message broker. See the design doc for details: https://docs.google.com/document/d/1Gvq67VrcHCnlO8f_NzMM1Y4c7wCNSdvo6qqLWg8upfw/

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@bzablocki
Copy link
Contributor Author

.take-issue

@liferoad
Copy link
Collaborator

@bzablocki can you also send the design doc to the Beam dev list

@kennknowles FYI

@bzablocki
Copy link
Contributor Author

Good point, email sent.

@bzablocki
Copy link
Contributor Author

project init: #31566

@ppawel
Copy link

ppawel commented Jul 1, 2024

@bzablocki Very cool to see Solace related development in Beam! We are using Solace and Beam together for ~3 years now and we've been on a journey... the connector made by Solace (https://github.com/SolaceProducts/solace-apache-beam) is not maintained anymore, so we switched to JMS IO but that has its own issues... so we have to patch JmsIO for our use cases. If you have time, you could take a look at my comments here, might be useful to have more view on the use cases:

#30218 (comment)

In general, the pain points with JmsIO are:

  1. How it manages JMS resources - since Beam 2.55, it closes the JMS session every time it acknowledges a message (which means every ~10 messages based on what I see in local and GCP runners).

  2. How it manages the watermark - I had to patch it so it behaves more like PubsubIO, main problem is that if no messages are coming, it never bumps the watermark by itself so then windowing behaves very strangely (i.e. windows don't close) in pipelines in streaming mode.

  3. The integration tests I have seen there assume that the JMS client does not use buffering (a.k.a. prefetch, or "window size" in Solace terms) and I think that's fundamentally wrong. Solace client does not even allow to switch off buffering and you can see in my comment linked above how it turns out (deadlock since Beam 2.55).

Anyway, those are just some random thoughts, hopefully useful :) For now I am maintaining my custom patches and hope to one day upstream them (no time until now). Hopefully when SolaceIO is ready, we can switch to it and all our problems will be magically solved ;) #nopressure

@bzablocki
Copy link
Contributor Author

Hi @ppawel!
Thanks for your feedback. As you noticed, some of the Solace-related PRs are already merged and I'm expecting that it will be officially available in Beam 2.58, in a few (~5?) weeks time.

Responding to your pain points:

  1. this will be resolved - sessions are long-lasting and closed only when the runner decides to close the reader, which happens for example when the autoscaling kicks in.
  2. the watermark will advance even if no new messages are received. With the current implementation it will be set to now-30sec. If needed we can add a configuration parameter to change this behavior. More details in the design doc.
  3. This should be handled well, I think. Is your setup similar to the integration test I created?

I'll post a message here when it gets published and I'd love to hear your feedback when you test it. Let's stay in touch.

Cheers,
Bartosz

@ppawel
Copy link

ppawel commented Jul 13, 2024

Hey @bzablocki
Apologies for a longer silence, it's been a bit crazy in the last weeks... and in fact partly because of the issues we are having with the JmsIO connector... there are now new problems appearing as we get more traffic on the queues, like issues with not acking, retransmissions, duplicate messages.. none of this is correctly handled by JmsIO :/

So I am now looking forward to try out the SolaceIO ASAP. I think I will check what is cooking on the master branch first. Is the connector planned to be included in the next Beam release? Would be really great...

I will come back with any feedback I might have.

Pawel

@tvalentyn
Copy link
Contributor

Should we close this? Would anyone be intrested in trying this IO on an ongoing release candidate and voting on the beam release thread: https://lists.apache.org/thread/1wstfv1psvm5lxkwqrd26dfhgn9ytmqc ?

@bzablocki
Copy link
Contributor Author

Hi @ppawel, the connector will be released with Beam 2.58 in the coming weeks, but the version on master is ready to use. Feel free to try the connector out and share any feedback you have. I also encourage you to vote in the release thread @tvalentyn mentioned if the connector works as expected.

@tvalentyn let me merge one small PR that I just added and I'll be able to close this issue. Thanks!

@ppawel
Copy link

ppawel commented Jul 24, 2024

OK, thanks for the update. I plan to start testing it in our project today.

Just a quick question regarding the write/publish support - I see the work has just started on that part, so it probably won't be released with 2.58?

@bzablocki
Copy link
Contributor Author

Just a quick question regarding the write/publish support - I see the work has just started on that part, so it probably won't be released with 2.58?

Correct. We have #31905 to track progress for the write connector and we expect that it will be released with 2.59 in a few weeks time :)

@ppawel
Copy link

ppawel commented Jul 24, 2024

I just plugged in SolaceIO into our project, and overall looks good so far, but I ran into an issue when trying to run all our tests:

com.solacesystems.jcsmp.JCSMPErrorResponseException: 503: Too Many Connections For VPN

The smfclient id reached over 500 - it seems that the pipelines in our tests are not closing the Solace connections properly when using SolaceIO (with JmsIO this works fine). We are using the following to stop the streaming pipeline in test teardown:

// result is PipelineResult, as returned from the pipeline's run() method
result.cancel();
result.waitUntilFinish();

Is there anything additional to be done with SolaceIO? I will look deeper in our code, maybe test code is leaking connections somewhere but otoh, with JmsIO it is working fine.

@ppawel
Copy link

ppawel commented Jul 24, 2024

I tried capping max connections with SolaceIO to 5 but it still runs into the limit on the broker, just a bit slower...

Had a quick look at JmsIO and one thing jumped out - it implements the finalize method in the reader, where it closes the resources, see:

https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java#L838

Perhaps that helps with keeping the connections from blowing up as I imagine it would close them on garbage collection, although I'm not sure in which circumstances the normal close method from the reader wouldn't be called...

@bzablocki
Copy link
Contributor Author

I tried capping max connections with SolaceIO to 5 but it still runs into the limit on the broker, just a bit slower...

I assume you set the SolaceIO.read().withMaxNumConnections(...) to 5?

@ppawel
Copy link

ppawel commented Jul 24, 2024

I tried capping max connections with SolaceIO to 5 but it still runs into the limit on the broker, just a bit slower...

I assume you set the SolaceIO.read().withMaxNumConnections(...) to 5?

Yes, exactly.

@bzablocki
Copy link
Contributor Author

Which runner are you using?

@ppawel
Copy link

ppawel commented Jul 24, 2024

DirectRunner + Solace in a Docker container.

@bzablocki
Copy link
Contributor Author

Thanks, I think I see the issue. It looks like the DirectRunner is mishandling connections by frequently closing and recreating them, causing an increase in the connection count. This is unexpected and I will investigate it as soon as possible.

In contrast, DataflowRunner maintains long-lasting connections and the number of connections is stable.

PS. would you mind sharing how you check the number of connections in Solace?

@ppawel
Copy link

ppawel commented Jul 24, 2024

PS. would you mind sharing how you check the number of connections in Solace?

Normally I just look at the logs from com.solacesystems.jcsmp.protocol.impl.TcpClientChannel in the test output, this class is logging when a connection is being open/closed ("Client-13: Connecting to host" etc.)

In this case I also checked the Solace UI, the VPN clients dashboard view:

image

@bzablocki
Copy link
Contributor Author

Thanks. I think I found it. The DirectRunner will sometimes close the session, see the source. This means that you will see an increasing number of clients in the logs, because every once in a while a client is closed and a new one is created in its place.
Now, the number of active clients was indeed slowly growing and that was a real issue. It meant clients were not properly closed. I managed to find the root cause - I filed #31965 and will work on it tomorrow.

@ppawel
Copy link

ppawel commented Jul 24, 2024

Ah, nice! I was in fact looking at the closing code in SolaceIO and was checking JCSMP session and noticed that flow receiver instances aren't closed but for some reason I assumed they are managed by the Solace session... which is not really true I guess, even in our test code we have some consumers started and stopped/closed and we indeed call com.solacesystems.jcsmp.Consumer#close explicitly.

Not sure why I have not caught and reported that but in the end you found it so looking forward to testing an updated version :)

@bzablocki
Copy link
Contributor Author

#31965 is merged.
I will close this issue now, as the read connector will be released in 2.58.
If there are any new problems regarding this connector, please open a new issue and tag me.

@github-actions github-actions bot added this to the 2.59.0 Release milestone Jul 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants