Skip to content

Commit 976e89e

Browse files
committed
Make the PublishResult class for handling callbacks non-static to handle pipelines with multiple write transforms.
1 parent e545ec9 commit 976e89e

File tree

10 files changed

+87
-28
lines changed

10 files changed

+87
-28
lines changed

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import javax.annotation.Nullable;
3737
import org.apache.beam.sdk.io.solace.RetryCallableManager;
3838
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
39+
import org.apache.beam.sdk.io.solace.write.PublishResultsReceiver;
3940
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
4041

4142
/**
@@ -46,6 +47,7 @@
4647
*/
4748
@AutoValue
4849
public abstract class BasicAuthJcsmpSessionService extends SessionService {
50+
4951
/** The name of the queue to receive messages from. */
5052
public abstract @Nullable String queueName();
5153

@@ -83,6 +85,7 @@ public abstract static class Builder {
8385
@Nullable private transient JCSMPSession jcsmpSession;
8486
@Nullable private transient MessageReceiver messageReceiver;
8587
@Nullable private transient MessageProducer messageProducer;
88+
private final PublishResultsReceiver publishResultsReceiver = new PublishResultsReceiver();
8689
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
8790

8891
@Override
@@ -119,7 +122,7 @@ public MessageReceiver getReceiver() {
119122
}
120123

121124
@Override
122-
public MessageProducer getProducer(SubmissionMode submissionMode) {
125+
public MessageProducer getInitializeProducer(SubmissionMode submissionMode) {
123126
if (this.messageProducer == null || this.messageProducer.isClosed()) {
124127
Callable<MessageProducer> create = () -> createXMLMessageProducer(submissionMode);
125128
this.messageProducer =
@@ -128,20 +131,28 @@ public MessageProducer getProducer(SubmissionMode submissionMode) {
128131
return checkStateNotNull(this.messageProducer);
129132
}
130133

134+
@Override
135+
public PublishResultsReceiver getPublishResultsReceiver() {
136+
return publishResultsReceiver;
137+
}
138+
131139
@Override
132140
public boolean isClosed() {
133141
return jcsmpSession == null || jcsmpSession.isClosed();
134142
}
135143

136144
private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
137145
throws JCSMPException, IOException {
146+
138147
if (isClosed()) {
139148
connectWriteSession(submissionMode);
140149
}
141150

142151
@SuppressWarnings("nullness")
143152
Callable<XMLMessageProducer> initProducer =
144-
() -> Objects.requireNonNull(jcsmpSession).getMessageProducer(new PublishResultHandler());
153+
() ->
154+
Objects.requireNonNull(jcsmpSession)
155+
.getMessageProducer(new PublishResultHandler(publishResultsReceiver));
145156

146157
XMLMessageProducer producer =
147158
retryCallableManager.retryCallable(initProducer, ImmutableSet.of(JCSMPException.class));

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.apache.beam.sdk.io.solace.data.Solace;
2323
import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
2424
import org.apache.beam.sdk.io.solace.write.PublishResultsReceiver;
25+
import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter;
26+
import org.apache.beam.sdk.metrics.Counter;
27+
import org.apache.beam.sdk.metrics.Metrics;
2528
import org.checkerframework.checker.nullness.qual.Nullable;
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
@@ -38,6 +41,13 @@
3841
public final class PublishResultHandler implements JCSMPStreamingPublishCorrelatingEventHandler {
3942

4043
private static final Logger LOG = LoggerFactory.getLogger(PublishResultHandler.class);
44+
private final PublishResultsReceiver publishResultsReceiver;
45+
private final Counter batchesRejectedByBroker =
46+
Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected");
47+
48+
public PublishResultHandler(PublishResultsReceiver publishResultsReceiver) {
49+
this.publishResultsReceiver = publishResultsReceiver;
50+
}
4151

4252
@Override
4353
public void handleErrorEx(Object key, JCSMPException cause, long timestamp) {
@@ -64,6 +74,7 @@ private void processKey(Object key, boolean isPublished, @Nullable JCSMPExceptio
6474

6575
resultBuilder = resultBuilder.setMessageId(messageId).setPublished(isPublished);
6676
if (!isPublished) {
77+
batchesRejectedByBroker.inc();
6778
if (cause != null) {
6879
resultBuilder = resultBuilder.setError(cause.getMessage());
6980
} else {
@@ -78,7 +89,7 @@ private void processKey(Object key, boolean isPublished, @Nullable JCSMPExceptio
7889
PublishResult publishResult = resultBuilder.build();
7990
// Static reference, it receives all callbacks from all publications
8091
// from all threads
81-
PublishResultsReceiver.addResult(publishResult);
92+
publishResultsReceiver.addResult(publishResult);
8293
}
8394

8495
private static long calculateLatency(Solace.CorrelationKey key) {

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.Serializable;
2222
import org.apache.beam.sdk.io.solace.SolaceIO;
2323
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
24+
import org.apache.beam.sdk.io.solace.write.PublishResultsReceiver;
2425
import org.checkerframework.checker.nullness.qual.Nullable;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -137,7 +138,15 @@ public abstract class SessionService implements Serializable {
137138
* this method is used, the producer is created from the session instance, otherwise it returns
138139
* the producer created initially.
139140
*/
140-
public abstract MessageProducer getProducer(SubmissionMode mode);
141+
public abstract MessageProducer getInitializeProducer(SubmissionMode mode);
142+
143+
/**
144+
* Returns the {@link PublishResultsReceiver} instance associated with this session.
145+
*
146+
* <p>The {@link PublishResultsReceiver} handles asynchronous callbacks from Solace, providing
147+
* results for message publications.
148+
*/
149+
public abstract PublishResultsReceiver getPublishResultsReceiver();
141150

142151
/**
143152
* Override this method and provide your specific properties, including all those related to

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,13 @@
3030
*/
3131
@Internal
3232
public final class PublishResultsReceiver {
33-
private static final ConcurrentLinkedQueue<PublishResult> resultsQueue =
34-
new ConcurrentLinkedQueue<>();
33+
private final ConcurrentLinkedQueue<PublishResult> resultsQueue = new ConcurrentLinkedQueue<>();
3534

36-
public static @Nullable PublishResult pollResults() {
35+
public @Nullable PublishResult pollResults() {
3736
return resultsQueue.poll();
3837
}
3938

40-
public static boolean addResult(PublishResult result) {
39+
public boolean addResult(PublishResult result) {
4140
return resultsQueue.add(result);
4241
}
4342
}

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2222

2323
import com.google.auto.value.AutoValue;
24+
import java.util.UUID;
2425
import java.util.concurrent.ConcurrentHashMap;
2526
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
2627
import org.apache.beam.sdk.io.solace.broker.SessionService;
@@ -39,15 +40,17 @@
3940
* than one, each {@link SessionServiceFactory} instance keeps their own pool of producers.
4041
*/
4142
final class SolaceWriteSessionsHandler {
43+
4244
private static final ConcurrentHashMap<SessionConfigurationIndex, SessionService> sessionsMap =
4345
new ConcurrentHashMap<>(DEFAULT_WRITER_CLIENTS_PER_WORKER);
4446

45-
public static SessionService getSessionService(
46-
int producerIndex, SessionServiceFactory sessionServiceFactory) {
47+
public static SessionService getSessionServiceWithProducer(
48+
int producerIndex, SessionServiceFactory sessionServiceFactory, UUID writerTransformUuid) {
4749
SessionConfigurationIndex key =
4850
SessionConfigurationIndex.builder()
4951
.producerIndex(producerIndex)
5052
.sessionServiceFactory(sessionServiceFactory)
53+
.writerTransformUuid(writerTransformUuid)
5154
.build();
5255
return sessionsMap.computeIfAbsent(
5356
key, SolaceWriteSessionsHandler::createSessionAndStartProducer);
@@ -61,17 +64,19 @@ private static SessionService createSessionAndStartProducer(SessionConfiguration
6164
checkStateNotNull(
6265
mode,
6366
"SolaceIO.Write: Submission mode is not set. You need to set it to create write sessions.");
64-
sessionService.getProducer(mode);
67+
sessionService.getInitializeProducer(mode);
6568
return sessionService;
6669
}
6770

6871
/** Disconnect all the sessions from Solace, and clear the corresponding state. */
69-
public static void disconnectFromSolace(SessionServiceFactory factory, int producersCardinality) {
72+
public static void disconnectFromSolace(
73+
SessionServiceFactory factory, int producersCardinality, UUID writerTransformUuid) {
7074
for (int i = 0; i < producersCardinality; i++) {
7175
SessionConfigurationIndex key =
7276
SessionConfigurationIndex.builder()
7377
.producerIndex(i)
7478
.sessionServiceFactory(factory)
79+
.writerTransformUuid(writerTransformUuid)
7580
.build();
7681

7782
SessionService sessionService = sessionsMap.remove(key);
@@ -87,6 +92,8 @@ abstract static class SessionConfigurationIndex {
8792

8893
abstract SessionServiceFactory sessionServiceFactory();
8994

95+
abstract UUID writerTransformUuid();
96+
9097
static Builder builder() {
9198
return new AutoValue_SolaceWriteSessionsHandler_SessionConfigurationIndex.Builder();
9299
}
@@ -97,6 +104,8 @@ abstract static class Builder {
97104

98105
abstract Builder sessionServiceFactory(SessionServiceFactory sessionServiceFactory);
99106

107+
abstract Builder writerTransformUuid(UUID writerTransformUuid);
108+
100109
abstract SessionConfigurationIndex build();
101110
}
102111
}

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public final class UnboundedBatchedSolaceWriter extends UnboundedSolaceWriter {
6969
Metrics.counter(UnboundedBatchedSolaceWriter.class, "msgs_sent_to_broker");
7070

7171
private final Counter batchesRejectedByBroker =
72-
Metrics.counter(UnboundedBatchedSolaceWriter.class, "batches_rejected");
72+
Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected");
7373

7474
// State variables are never explicitly "used"
7575
@SuppressWarnings("UnusedVariable")
@@ -142,8 +142,8 @@ public void flushBundle(OnTimerContext context) {
142142
private void publishBatch(List<Solace.Record> records) {
143143
try {
144144
int entriesPublished =
145-
solaceSessionService()
146-
.getProducer(getSubmissionMode())
145+
solaceSessionServiceWithProducer()
146+
.getInitializeProducer(getSubmissionMode())
147147
.publishBatch(
148148
records, shouldPublishLatencyMetrics(), getDestinationFn(), getDeliveryMode());
149149
sentToBroker.inc(entriesPublished);
@@ -159,7 +159,7 @@ private void publishBatch(List<Solace.Record> records) {
159159
e.getMessage()))
160160
.setLatencyNanos(System.nanoTime())
161161
.build();
162-
PublishResultsReceiver.addResult(errorPublish);
162+
solaceSessionServiceWithProducer().getPublishResultsReceiver().addResult(errorPublish);
163163
}
164164
}
165165
}

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.solacesystems.jcsmp.JCSMPSendMultipleEntry;
2828
import java.util.ArrayList;
2929
import java.util.List;
30+
import java.util.UUID;
3031
import java.util.concurrent.TimeUnit;
3132
import org.apache.beam.sdk.annotations.Internal;
3233
import org.apache.beam.sdk.io.solace.SolaceIO;
@@ -80,6 +81,8 @@ public abstract class UnboundedSolaceWriter
8081
private @Nullable Instant bundleTimestamp;
8182
private @Nullable BoundedWindow bundleWindow;
8283

84+
final UUID writerTransformUuid = UUID.randomUUID();
85+
8386
public UnboundedSolaceWriter(
8487
SerializableFunction<Solace.Record, Destination> destinationFn,
8588
SessionServiceFactory sessionServiceFactory,
@@ -101,7 +104,8 @@ public UnboundedSolaceWriter(
101104

102105
@Teardown
103106
public void teardown() {
104-
SolaceWriteSessionsHandler.disconnectFromSolace(sessionServiceFactory, producersMapCardinality);
107+
SolaceWriteSessionsHandler.disconnectFromSolace(
108+
sessionServiceFactory, producersMapCardinality, writerTransformUuid);
105109
}
106110

107111
public void updateProducerIndex() {
@@ -115,9 +119,9 @@ public void startBundle() {
115119
batchToEmit.clear();
116120
}
117121

118-
public SessionService solaceSessionService() {
119-
return SolaceWriteSessionsHandler.getSessionService(
120-
currentBundleProducerIndex, sessionServiceFactory);
122+
public SessionService solaceSessionServiceWithProducer() {
123+
return SolaceWriteSessionsHandler.getSessionServiceWithProducer(
124+
currentBundleProducerIndex, sessionServiceFactory, writerTransformUuid);
121125
}
122126

123127
public void publishResults(BeamContextWrapper context) {
@@ -131,7 +135,9 @@ public void publishResults(BeamContextWrapper context) {
131135
long minFailed = Long.MAX_VALUE;
132136
long maxFailed = 0;
133137

134-
Solace.PublishResult result = PublishResultsReceiver.pollResults();
138+
PublishResultsReceiver publishResultsReceiver =
139+
solaceSessionServiceWithProducer().getPublishResultsReceiver();
140+
Solace.PublishResult result = publishResultsReceiver.pollResults();
135141

136142
if (result != null) {
137143
if (getCurrentBundleTimestamp() == null) {
@@ -174,7 +180,7 @@ public void publishResults(BeamContextWrapper context) {
174180
FAILED_PUBLISH_TAG, result, getCurrentBundleTimestamp(), getCurrentBundleWindow());
175181
}
176182

177-
result = PublishResultsReceiver.pollResults();
183+
result = publishResultsReceiver.pollResults();
178184
}
179185

180186
if (shouldPublishLatencyMetrics()) {

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ public void processElement(
110110

111111
// The publish method will retry, let's send a failure message if all the retries fail
112112
try {
113-
solaceSessionService()
114-
.getProducer(getSubmissionMode())
113+
solaceSessionServiceWithProducer()
114+
.getInitializeProducer(getSubmissionMode())
115115
.publishSingleMessage(
116116
record,
117117
getDestinationFn().apply(record),
@@ -130,7 +130,7 @@ public void processElement(
130130
e.getMessage()))
131131
.setLatencyNanos(System.nanoTime())
132132
.build();
133-
PublishResultsReceiver.addResult(errorPublish);
133+
solaceSessionServiceWithProducer().getPublishResultsReceiver().addResult(errorPublish);
134134
}
135135
}
136136

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.io.solace.broker.MessageProducer;
2424
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
2525
import org.apache.beam.sdk.io.solace.broker.SessionService;
26+
import org.apache.beam.sdk.io.solace.write.PublishResultsReceiver;
2627

2728
@AutoValue
2829
public abstract class MockEmptySessionService extends SessionService {
@@ -49,7 +50,12 @@ public MessageReceiver getReceiver() {
4950
}
5051

5152
@Override
52-
public MessageProducer getProducer(SubmissionMode mode) {
53+
public MessageProducer getInitializeProducer(SubmissionMode mode) {
54+
throw new UnsupportedOperationException(exceptionMessage);
55+
}
56+
57+
@Override
58+
public PublishResultsReceiver getPublishResultsReceiver() {
5359
throw new UnsupportedOperationException(exceptionMessage);
5460
}
5561

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.beam.sdk.io.solace.broker.PublishResultHandler;
3232
import org.apache.beam.sdk.io.solace.broker.SessionService;
3333
import org.apache.beam.sdk.io.solace.data.Solace;
34+
import org.apache.beam.sdk.io.solace.write.PublishResultsReceiver;
3435
import org.apache.beam.sdk.transforms.SerializableFunction;
3536
import org.checkerframework.checker.nullness.qual.Nullable;
3637

@@ -45,6 +46,8 @@ public abstract class MockSessionService extends SessionService {
4546

4647
public abstract @Nullable SubmissionMode mode();
4748

49+
private final PublishResultsReceiver publishResultsReceiver = new PublishResultsReceiver();
50+
4851
public static Builder builder() {
4952
return new AutoValue_MockSessionService.Builder().minMessagesReceived(0);
5053
}
@@ -81,13 +84,18 @@ public MessageReceiver getReceiver() {
8184
}
8285

8386
@Override
84-
public MessageProducer getProducer(SubmissionMode mode) {
87+
public MessageProducer getInitializeProducer(SubmissionMode mode) {
8588
if (messageProducer == null) {
86-
messageProducer = new MockProducer(new PublishResultHandler());
89+
messageProducer = new MockProducer(new PublishResultHandler(publishResultsReceiver));
8790
}
8891
return messageProducer;
8992
}
9093

94+
@Override
95+
public PublishResultsReceiver getPublishResultsReceiver() {
96+
return publishResultsReceiver;
97+
}
98+
9199
@Override
92100
public void connect() {}
93101

0 commit comments

Comments
 (0)