Skip to content

Commit c6a7354

Browse files
ihtbzablocki
andauthored
SolaceIO write connector (#32060)
* This is a follow-up PR to #31953, and part of the issue #31905. This PR adds the actual writer functionality, and some additional testing, including integration testing. This should be final PR for the SolaceIO write connector to be complete. * Use static imports for Preconditions * Remove unused method * Logging has builtin formatting support * Use TypeDescriptors to check the type used as input * Fix parameter name * Use interface + utils class for MessageProducer * Use null instead of optional * Avoid using ByteString just to create an empty byte array. * Fix documentation, we are not using ByteString now. * Not needed anymore, we are not using ByteString * Defer transforming latency from nanos to millis. The transform into millis is done at the presentation moment, when the metric is reported to Beam. * Avoid using top level classes with a single inner class. A couple of DoFns are moved to their own files too, as the abstract class forthe UnboundedSolaceWriter was in practice a "package". This commits addresses a few comments about the structure of UnboundedSolaceWriter and some base classes of that abstract class. * Remove using a state variable, there is already a timer. This DoFn is a stateful DoFn to force a shuffling with a given input key set cardinality. * Properties must always be set. The warnings are only shown if the user decided to set the properties that are overriden by the connector. This was changed in one of the previous commits but it is actually a bug. I am reverting that change and changing this to a switch block, to make it more clear that the properties need to be set always by the connector. * Add a new custom mode so no JCSMP property is overridden. This lets the user to fully control all the properties used by the connector, instead of making sensible choices on its behalf. This also adds some logging to be more explicit about what the connector is doing. This does not add too much logging pressure, this only adds logging at the producer creation moment. * Add some more documentation about the new custom submission mode. * Fix bug introduced with the refactoring of code for this PR. I forgot to pass the submission mode when the write session is created, and I called the wrong method in the base class because it was defined as public. This makes sure that the submission mode is passed to the session when the session is created for writing messages. * Remove unnecessary Serializable annotation. * Make the PublishResult class for handling callbacks non-static to handle pipelines with multiple write transforms. * Rename maxNumOfUsedWorkers to numShards * Use RoundRobin assignment of producers to process bundles. * Output results in a GlobalWindow * Add ErrorHandler * Fix docs * Remove PublishResultHandler class that was just a wrapper around a Queue * small refactors * Revert CsvIO docs fix * Add withErrorHandler docs * fix var scope --------- Co-authored-by: Bartosz Zablocki <bzablocki@google.com>
1 parent ff5feed commit c6a7354

30 files changed

+2508
-392
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
7171
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
7272
* [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879))
73+
* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)).
7374

7475
## New Features / Improvements
7576

sdks/java/io/solace/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ dependencies {
5353
testImplementation library.java.junit
5454
testImplementation project(path: ":sdks:java:io:common")
5555
testImplementation project(path: ":sdks:java:testing:test-utils")
56+
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
5657
testRuntimeOnly library.java.slf4j_jdk14
5758
testImplementation library.java.testcontainers_solace
5859
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java

Lines changed: 176 additions & 25 deletions
Large diffs are not rendered by default.

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java

Lines changed: 115 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2121

22+
import com.google.auto.value.AutoValue;
2223
import com.solacesystems.jcsmp.ConsumerFlowProperties;
2324
import com.solacesystems.jcsmp.EndpointProperties;
2425
import com.solacesystems.jcsmp.FlowReceiver;
@@ -28,9 +29,15 @@
2829
import com.solacesystems.jcsmp.JCSMPProperties;
2930
import com.solacesystems.jcsmp.JCSMPSession;
3031
import com.solacesystems.jcsmp.Queue;
32+
import com.solacesystems.jcsmp.XMLMessageProducer;
3133
import java.io.IOException;
34+
import java.util.Objects;
35+
import java.util.concurrent.Callable;
36+
import java.util.concurrent.ConcurrentLinkedQueue;
3237
import javax.annotation.Nullable;
3338
import org.apache.beam.sdk.io.solace.RetryCallableManager;
39+
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
40+
import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
3441
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
3542

3643
/**
@@ -39,34 +46,50 @@
3946
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue. The
4047
* connection is established using basic authentication.
4148
*/
42-
public class BasicAuthJcsmpSessionService extends SessionService {
43-
private final String queueName;
44-
private final String host;
45-
private final String username;
46-
private final String password;
47-
private final String vpnName;
48-
@Nullable private JCSMPSession jcsmpSession;
49-
@Nullable private MessageReceiver messageReceiver;
50-
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
49+
@AutoValue
50+
public abstract class BasicAuthJcsmpSessionService extends SessionService {
51+
52+
/** The name of the queue to receive messages from. */
53+
public abstract @Nullable String queueName();
54+
55+
/** The host name or IP address of the Solace broker. Format: Host[:Port] */
56+
public abstract String host();
57+
58+
/** The username to use for authentication. */
59+
public abstract String username();
60+
61+
/** The password to use for authentication. */
62+
public abstract String password();
63+
64+
/** The name of the VPN to connect to. */
65+
public abstract String vpnName();
66+
67+
public static Builder builder() {
68+
return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME);
69+
}
70+
71+
@AutoValue.Builder
72+
public abstract static class Builder {
73+
public abstract Builder queueName(@Nullable String queueName);
74+
75+
public abstract Builder host(String host);
5176

52-
/**
53-
* Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters.
54-
*
55-
* @param queueName The name of the queue to receive messages from.
56-
* @param host The host name or IP address of the Solace broker. Format: Host[:Port]
57-
* @param username The username to use for authentication.
58-
* @param password The password to use for authentication.
59-
* @param vpnName The name of the VPN to connect to.
60-
*/
61-
public BasicAuthJcsmpSessionService(
62-
String queueName, String host, String username, String password, String vpnName) {
63-
this.queueName = queueName;
64-
this.host = host;
65-
this.username = username;
66-
this.password = password;
67-
this.vpnName = vpnName;
77+
public abstract Builder username(String username);
78+
79+
public abstract Builder password(String password);
80+
81+
public abstract Builder vpnName(String vpnName);
82+
83+
public abstract BasicAuthJcsmpSessionService build();
6884
}
6985

86+
@Nullable private transient JCSMPSession jcsmpSession;
87+
@Nullable private transient MessageReceiver messageReceiver;
88+
@Nullable private transient MessageProducer messageProducer;
89+
private final java.util.Queue<PublishResult> publishedResultsQueue =
90+
new ConcurrentLinkedQueue<>();
91+
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
92+
7093
@Override
7194
public void connect() {
7295
retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
@@ -79,6 +102,9 @@ public void close() {
79102
if (messageReceiver != null) {
80103
messageReceiver.close();
81104
}
105+
if (messageProducer != null) {
106+
messageProducer.close();
107+
}
82108
if (!isClosed()) {
83109
checkStateNotNull(jcsmpSession).closeSession();
84110
}
@@ -88,24 +114,64 @@ public void close() {
88114
}
89115

90116
@Override
91-
public MessageReceiver createReceiver() {
92-
this.messageReceiver =
93-
retryCallableManager.retryCallable(
94-
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
117+
public MessageReceiver getReceiver() {
118+
if (this.messageReceiver == null) {
119+
this.messageReceiver =
120+
retryCallableManager.retryCallable(
121+
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
122+
}
95123
return this.messageReceiver;
96124
}
97125

126+
@Override
127+
public MessageProducer getInitializedProducer(SubmissionMode submissionMode) {
128+
if (this.messageProducer == null || this.messageProducer.isClosed()) {
129+
Callable<MessageProducer> create = () -> createXMLMessageProducer(submissionMode);
130+
this.messageProducer =
131+
retryCallableManager.retryCallable(create, ImmutableSet.of(JCSMPException.class));
132+
}
133+
return checkStateNotNull(this.messageProducer);
134+
}
135+
136+
@Override
137+
public java.util.Queue<PublishResult> getPublishedResultsQueue() {
138+
return publishedResultsQueue;
139+
}
140+
98141
@Override
99142
public boolean isClosed() {
100143
return jcsmpSession == null || jcsmpSession.isClosed();
101144
}
102145

146+
private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
147+
throws JCSMPException, IOException {
148+
149+
if (isClosed()) {
150+
connectWriteSession(submissionMode);
151+
}
152+
153+
@SuppressWarnings("nullness")
154+
Callable<XMLMessageProducer> initProducer =
155+
() ->
156+
Objects.requireNonNull(jcsmpSession)
157+
.getMessageProducer(new PublishResultHandler(publishedResultsQueue));
158+
159+
XMLMessageProducer producer =
160+
retryCallableManager.retryCallable(initProducer, ImmutableSet.of(JCSMPException.class));
161+
if (producer == null) {
162+
throw new IOException("SolaceIO.Write: Could not create producer, producer object is null");
163+
}
164+
return new SolaceMessageProducer(producer);
165+
}
166+
103167
private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
104168
if (isClosed()) {
105169
connectSession();
106170
}
107171

108-
Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);
172+
Queue queue =
173+
JCSMPFactory.onlyInstance()
174+
.createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set."));
109175

110176
ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
111177
flowProperties.setEndpoint(queue);
@@ -118,7 +184,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException
118184
createFlowReceiver(jcsmpSession, flowProperties, endpointProperties));
119185
}
120186
throw new IOException(
121-
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null.");
187+
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is"
188+
+ " null.");
122189
}
123190

124191
// The `@SuppressWarning` is needed here, because the checkerframework reports an error for the
@@ -141,20 +208,33 @@ private int connectSession() throws JCSMPException {
141208
return 0;
142209
}
143210

211+
private int connectWriteSession(SubmissionMode mode) throws JCSMPException {
212+
if (jcsmpSession == null) {
213+
jcsmpSession = createWriteSessionObject(mode);
214+
}
215+
jcsmpSession.connect();
216+
return 0;
217+
}
218+
144219
private JCSMPSession createSessionObject() throws InvalidPropertiesException {
145220
JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties());
146221
return JCSMPFactory.onlyInstance().createSession(properties);
147222
}
148223

224+
private JCSMPSession createWriteSessionObject(SubmissionMode mode)
225+
throws InvalidPropertiesException {
226+
return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode));
227+
}
228+
149229
@Override
150230
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
151-
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName);
231+
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());
152232

153233
baseProps.setProperty(
154234
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
155-
baseProps.setProperty(JCSMPProperties.USERNAME, username);
156-
baseProps.setProperty(JCSMPProperties.PASSWORD, password);
157-
baseProps.setProperty(JCSMPProperties.HOST, host);
235+
baseProps.setProperty(JCSMPProperties.USERNAME, username());
236+
baseProps.setProperty(JCSMPProperties.PASSWORD, password());
237+
baseProps.setProperty(JCSMPProperties.HOST, host());
158238
return baseProps;
159239
}
160240
}

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.beam.sdk.io.solace.broker;
1919

2020
import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
21-
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2221

2322
import com.google.auto.value.AutoValue;
2423

@@ -31,12 +30,16 @@
3130
*/
3231
@AutoValue
3332
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
33+
/** The host name or IP address of the Solace broker. Format: Host[:Port] */
3434
public abstract String host();
3535

36+
/** The username to use for authentication. */
3637
public abstract String username();
3738

39+
/** The password to use for authentication. */
3840
public abstract String password();
3941

42+
/** The name of the VPN to connect to. */
4043
public abstract String vpnName();
4144

4245
public static Builder builder() {
@@ -54,6 +57,7 @@ public abstract static class Builder {
5457

5558
/** Set Solace username. */
5659
public abstract Builder username(String username);
60+
5761
/** Set Solace password. */
5862
public abstract Builder password(String password);
5963

@@ -65,11 +69,15 @@ public abstract static class Builder {
6569

6670
@Override
6771
public SessionService create() {
68-
return new BasicAuthJcsmpSessionService(
69-
checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
70-
host(),
71-
username(),
72-
password(),
73-
vpnName());
72+
BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder();
73+
if (queue != null) {
74+
builder = builder.queueName(queue.getName());
75+
}
76+
return builder
77+
.host(host())
78+
.username(username())
79+
.password(password())
80+
.vpnName(vpnName())
81+
.build();
7482
}
7583
}

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public abstract static class Builder {
117117

118118
@Override
119119
public SessionService create() {
120-
String password = null;
120+
String password;
121121
try {
122122
password = retrieveSecret();
123123
} catch (IOException e) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.solace.broker;
19+
20+
import com.solacesystems.jcsmp.DeliveryMode;
21+
import com.solacesystems.jcsmp.Destination;
22+
import java.util.List;
23+
import org.apache.beam.sdk.annotations.Internal;
24+
import org.apache.beam.sdk.io.solace.data.Solace;
25+
import org.apache.beam.sdk.transforms.SerializableFunction;
26+
27+
/**
28+
* Base class for publishing messages to a Solace broker.
29+
*
30+
* <p>Implementations of this interface are responsible for managing the connection to the broker
31+
* and for publishing messages to the broker.
32+
*/
33+
@Internal
34+
public interface MessageProducer {
35+
36+
/** Publishes a message to the broker. */
37+
void publishSingleMessage(
38+
Solace.Record msg,
39+
Destination topicOrQueue,
40+
boolean useCorrelationKeyLatency,
41+
DeliveryMode deliveryMode);
42+
43+
/**
44+
* Publishes a batch of messages to the broker.
45+
*
46+
* <p>The size of the batch cannot exceed 50 messages, this is a limitation of the Solace API.
47+
*
48+
* <p>It returns the number of messages written.
49+
*/
50+
int publishBatch(
51+
List<Solace.Record> records,
52+
boolean useCorrelationKeyLatency,
53+
SerializableFunction<Solace.Record, Destination> destinationFn,
54+
DeliveryMode deliveryMode);
55+
56+
/** Returns {@literal true} if the message producer is closed, {@literal false} otherwise. */
57+
boolean isClosed();
58+
59+
/** Closes the message producer. */
60+
void close();
61+
}

0 commit comments

Comments
 (0)