diff --git a/CHANGES.md b/CHANGES.md index 261fafc024f3..8ee50f1112bc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,7 @@ * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) * BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527)) * [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879)) +* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)). ## New Features / Improvements diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 741db51a5772..ef0d49891f08 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -53,6 +53,7 @@ dependencies { testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testRuntimeOnly library.java.slf4j_jdk14 testImplementation library.java.testcontainers_solace testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index dcfdcc4fabb9..a55d8a0a4217 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.solace; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -38,16 +39,29 @@ import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Record; import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper; import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource; +import org.apache.beam.sdk.io.solace.write.AddShardKeyDoFn; import org.apache.beam.sdk.io.solace.write.SolaceOutput; +import org.apache.beam.sdk.io.solace.write.UnboundedBatchedSolaceWriter; +import org.apache.beam.sdk.io.solace.write.UnboundedStreamingSolaceWriter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; @@ -147,7 +161,7 @@ * function. * *
{@code
- * @DefaultSchema(JavaBeanSchema.class)
+ * {@literal @}DefaultSchema(JavaBeanSchema.class)
* public static class SimpleRecord {
* public String payload;
* public String messageId;
@@ -238,7 +252,7 @@
* default VPN name by setting the required JCSMP property in the session factory (in this case,
* with {@link BasicAuthJcsmpSessionServiceFactory#vpnName()}), the number of clients per worker
* with {@link Write#withNumberOfClientsPerWorker(int)} and the number of parallel write clients
- * using {@link Write#withMaxNumOfUsedWorkers(int)}.
+ * using {@link Write#withNumShards(int)}.
*
* Writing to dynamic destinations
*
@@ -345,13 +359,17 @@
*
* The streaming connector publishes each message individually, without holding up or batching
* before the message is sent to Solace. This will ensure the lowest possible latency, but it will
- * offer a much lower throughput. The streaming connector does not use state & timers.
+ * offer a much lower throughput. The streaming connector does not use state and timers.
*
- *
Both connectors uses state & timers to control the level of parallelism. If you are using
+ *
Both connectors uses state and timers to control the level of parallelism. If you are using
* Cloud Dataflow, it is recommended that you enable Streaming Engine to use this
* connector.
*
+ *
For full control over all the properties, use {@link SubmissionMode#CUSTOM}. The connector
+ * will not override any property that you set, and you will have full control over all the JCSMP
+ * properties.
+ *
*
Authentication
*
* When writing to Solace, the user must use {@link
@@ -396,7 +414,7 @@ public class SolaceIO {
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD =
Duration.standardSeconds(30);
- public static final int DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS = 20;
+ public static final int DEFAULT_WRITER_NUM_SHARDS = 20;
public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER = 4;
public static final Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS = false;
public static final SubmissionMode DEFAULT_WRITER_SUBMISSION_MODE =
@@ -445,6 +463,7 @@ public static Read read() {
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
}
+
/**
* Create a {@link Read} transform, to read from Solace. Specify a {@link SerializableFunction} to
* map incoming {@link BytesXMLMessage} records, to the object of your choice. You also need to
@@ -805,7 +824,9 @@ private Queue initializeQueueForTopicIfNeeded(
public enum SubmissionMode {
HIGHER_THROUGHPUT,
- LOWER_LATENCY
+ LOWER_LATENCY,
+ CUSTOM, // Don't override any property set by the user
+ TESTING // Send acks 1 by 1, this will be very slow, never use this in an actual pipeline!
}
public enum WriterType {
@@ -816,8 +837,9 @@ public enum WriterType {
@AutoValue
public abstract static class Write extends PTransform, SolaceOutput> {
- public static final TupleTag FAILED_PUBLISH_TAG =
- new TupleTag() {};
+ private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
+ public static final TupleTag FAILED_PUBLISH_TAG = new TupleTag() {};
public static final TupleTag SUCCESSFUL_PUBLISH_TAG =
new TupleTag() {};
@@ -863,8 +885,8 @@ public Write to(Solace.Queue queue) {
* cluster, and the need for performance when writing to Solace (more workers will achieve
* higher throughput).
*/
- public Write withMaxNumOfUsedWorkers(int maxNumOfUsedWorkers) {
- return toBuilder().setMaxNumOfUsedWorkers(maxNumOfUsedWorkers).build();
+ public Write withNumShards(int numShards) {
+ return toBuilder().setNumShards(numShards).build();
}
/**
@@ -877,8 +899,8 @@ public Write withMaxNumOfUsedWorkers(int maxNumOfUsedWorkers) {
* the number of clients created per VM. The clients will be re-used across different threads in
* the same worker.
*
- * Set this number in combination with {@link #withMaxNumOfUsedWorkers}, to ensure that the
- * limit for number of clients in your Solace cluster is not exceeded.
+ *
Set this number in combination with {@link #withNumShards}, to ensure that the limit for
+ * number of clients in your Solace cluster is not exceeded.
*
*
Normally, using a higher number of clients with fewer workers will achieve better
* throughput at a lower cost, since the workers are better utilized. A good rule of thumb to
@@ -921,15 +943,19 @@ public Write publishLatencyMetrics() {
* For full details, please check https://docs.solace.com/API/API-Developer-Guide/Java-API-Best-Practices.htm.
*
- *
The Solace JCSMP client libraries can dispatch messages using two different modes:
+ *
The Solace JCSMP client libraries can dispatch messages using three different modes:
*
*
One of the modes dispatches messages directly from the same thread that is doing the rest
* of I/O work. This mode favors lower latency but lower throughput. Set this to LOWER_LATENCY
* to use that mode (MESSAGE_CALLBACK_ON_REACTOR set to True).
*
- *
The other mode uses a parallel thread to accumulate and dispatch messages. This mode
- * favors higher throughput but also has higher latency. Set this to HIGHER_THROUGHPUT to use
- * that mode. This is the default mode (MESSAGE_CALLBACK_ON_REACTOR set to False).
+ *
Another mode uses a parallel thread to accumulate and dispatch messages. This mode favors
+ * higher throughput but also has higher latency. Set this to HIGHER_THROUGHPUT to use that
+ * mode. This is the default mode (MESSAGE_CALLBACK_ON_REACTOR set to False).
+ *
+ *
If you prefer to have full control over all the JCSMP properties, set this to CUSTOM, and
+ * override the classes {@link SessionServiceFactory} and {@link SessionService} to have full
+ * control on how to create the JCSMP sessions and producers used by the connector.
*
*
This is optional, the default value is HIGHER_THROUGHPUT.
*/
@@ -945,10 +971,12 @@ public Write withSubmissionMode(SubmissionMode submissionMode) {
* In streaming mode, the publishing latency will be lower, but the throughput will also be
* lower.
*
- *
With the batched mode, messages are accumulated until a batch size of 50 is reached, or 5
- * seconds have elapsed since the first message in the batch was received. The 50 messages are
- * sent to Solace in a single batch. This writer offers higher throughput but higher publishing
- * latency, as messages can be held up for up to 5 seconds until they are published.
+ *
With the batched mode, messages are accumulated until a batch size of 50 is reached, or
+ * {@link UnboundedBatchedSolaceWriter#ACKS_FLUSHING_INTERVAL_SECS} seconds have elapsed since
+ * the first message in the batch was received. The 50 messages are sent to Solace in a single
+ * batch. This writer offers higher throughput but higher publishing latency, as messages can be
+ * held up for up to {@link UnboundedBatchedSolaceWriter#ACKS_FLUSHING_INTERVAL_SECS}5seconds
+ * until they are published.
*
*
Notice that this is the message publishing latency, not the end-to-end latency. For very
* large scale pipelines, you will probably prefer to use the HIGHER_THROUGHPUT mode, as with
@@ -971,7 +999,20 @@ public Write withSessionServiceFactory(SessionServiceFactory factory) {
return toBuilder().setSessionServiceFactory(factory).build();
}
- abstract int getMaxNumOfUsedWorkers();
+ /**
+ * An optional error handler for handling records that failed to publish to Solace.
+ *
+ * If provided, this error handler will be invoked for each record that could not be
+ * successfully published. The error handler can implement custom logic for dealing with failed
+ * records, such as writing them to a dead-letter queue or logging them.
+ *
+ *
If no error handler is provided, failed records will be ignored.
+ */
+ public Write withErrorHandler(ErrorHandler errorHandler) {
+ return toBuilder().setErrorHandler(errorHandler).build();
+ }
+
+ abstract int getNumShards();
abstract int getNumberOfClientsPerWorker();
@@ -989,10 +1030,12 @@ public Write withSessionServiceFactory(SessionServiceFactory factory) {
abstract @Nullable SessionServiceFactory getSessionServiceFactory();
+ abstract @Nullable ErrorHandler getErrorHandler();
+
static Builder builder() {
return new AutoValue_SolaceIO_Write.Builder()
.setDeliveryMode(DEFAULT_WRITER_DELIVERY_MODE)
- .setMaxNumOfUsedWorkers(DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS)
+ .setNumShards(DEFAULT_WRITER_NUM_SHARDS)
.setNumberOfClientsPerWorker(DEFAULT_WRITER_CLIENTS_PER_WORKER)
.setPublishLatencyMetrics(DEFAULT_WRITER_PUBLISH_LATENCY_METRICS)
.setDispatchMode(DEFAULT_WRITER_SUBMISSION_MODE)
@@ -1003,7 +1046,7 @@ static Builder builder() {
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setMaxNumOfUsedWorkers(int maxNumOfUsedWorkers);
+ abstract Builder setNumShards(int numShards);
abstract Builder setNumberOfClientsPerWorker(int numberOfClientsPerWorker);
@@ -1021,13 +1064,121 @@ abstract static class Builder {
abstract Builder setSessionServiceFactory(SessionServiceFactory factory);
+ abstract Builder setErrorHandler(ErrorHandler errorHandler);
+
abstract Write build();
}
@Override
public SolaceOutput expand(PCollection input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ boolean usingSolaceRecord =
+ TypeDescriptor.of(Solace.Record.class)
+ .isSupertypeOf(checkNotNull(input.getTypeDescriptor()));
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x -> SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection records =
+ usingSolaceRecord
+ ? (PCollection) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ PCollection withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection> withShardKeys =
+ withGlobalWindow.apply("Add shard key", ParDo.of(new AddShardKeyDoFn(getNumShards())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : "Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label, getWriterTransform(destinationFn));
+
+ SolaceOutput output;
+ if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+ if (getErrorHandler() != null) {
+ checkNotNull(getErrorHandler()).addErrorCollection(solaceOutput.get(FAILED_PUBLISH_TAG));
+ }
+ output = SolaceOutput.in(input.getPipeline(), solaceOutput.get(SUCCESSFUL_PUBLISH_TAG));
+ } else {
+ LOG.info(
+ "Solace.Write: omitting writer output because delivery mode is {}", getDeliveryMode());
+ output = SolaceOutput.in(input.getPipeline(), null);
+ }
+
+ return output;
+ }
+
+ private ParDo.MultiOutput, Solace.PublishResult> getWriterTransform(
+ SerializableFunction destinationFn) {
+
+ ParDo.SingleOutput, Solace.PublishResult> writer =
+ ParDo.of(
+ getWriterType() == WriterType.STREAMING
+ ? new UnboundedStreamingSolaceWriter(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics())
+ : new UnboundedBatchedSolaceWriter(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics()));
+
+ return writer.withOutputTags(SUCCESSFUL_PUBLISH_TAG, TupleTagList.of(FAILED_PUBLISH_TAG));
+ }
+
+ /**
+ * Called before running the Pipeline to verify this transform is fully and correctly specified.
+ */
+ private void validateWriteTransform(boolean usingSolaceRecords) {
+ if (!usingSolaceRecords) {
+ checkNotNull(
+ getFormatFunction(),
+ "SolaceIO.Write: If you are not using Solace.Record as the input type, you"
+ + " must set a format function using withFormatFunction().");
+ }
+
+ checkArgument(
+ getNumShards() > 0, "SolaceIO.Write: The number of used workers must be positive.");
+ checkArgument(
+ getNumberOfClientsPerWorker() > 0,
+ "SolaceIO.Write: The number of clients per worker must be positive.");
+ checkArgument(
+ getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() == DeliveryMode.PERSISTENT,
+ String.format(
+ "SolaceIO.Write: Delivery mode must be either DIRECT or PERSISTENT. %s"
+ + " not supported",
+ getDeliveryMode()));
+ if (getPublishLatencyMetrics()) {
+ checkArgument(
+ getDeliveryMode() == DeliveryMode.PERSISTENT,
+ "SolaceIO.Write: Publish latency metrics can only be enabled for PERSISTENT"
+ + " delivery mode.");
+ }
+ checkNotNull(
+ getSessionServiceFactory(),
+ "SolaceIO: You need to pass a session service factory. For basic"
+ + " authentication, you can use BasicAuthJcsmpSessionServiceFactory.");
}
}
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
index 2137d574b09a..b2196dbf1067 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
@@ -19,6 +19,7 @@
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.FlowReceiver;
@@ -28,9 +29,15 @@
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
+import com.solacesystems.jcsmp.XMLMessageProducer;
import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
/**
@@ -39,34 +46,50 @@
* This class provides a way to connect to a Solace broker and receive messages from a queue. The
* connection is established using basic authentication.
*/
-public class BasicAuthJcsmpSessionService extends SessionService {
- private final String queueName;
- private final String host;
- private final String username;
- private final String password;
- private final String vpnName;
- @Nullable private JCSMPSession jcsmpSession;
- @Nullable private MessageReceiver messageReceiver;
- private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
+@AutoValue
+public abstract class BasicAuthJcsmpSessionService extends SessionService {
+
+ /** The name of the queue to receive messages from. */
+ public abstract @Nullable String queueName();
+
+ /** The host name or IP address of the Solace broker. Format: Host[:Port] */
+ public abstract String host();
+
+ /** The username to use for authentication. */
+ public abstract String username();
+
+ /** The password to use for authentication. */
+ public abstract String password();
+
+ /** The name of the VPN to connect to. */
+ public abstract String vpnName();
+
+ public static Builder builder() {
+ return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME);
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder queueName(@Nullable String queueName);
+
+ public abstract Builder host(String host);
- /**
- * Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters.
- *
- * @param queueName The name of the queue to receive messages from.
- * @param host The host name or IP address of the Solace broker. Format: Host[:Port]
- * @param username The username to use for authentication.
- * @param password The password to use for authentication.
- * @param vpnName The name of the VPN to connect to.
- */
- public BasicAuthJcsmpSessionService(
- String queueName, String host, String username, String password, String vpnName) {
- this.queueName = queueName;
- this.host = host;
- this.username = username;
- this.password = password;
- this.vpnName = vpnName;
+ public abstract Builder username(String username);
+
+ public abstract Builder password(String password);
+
+ public abstract Builder vpnName(String vpnName);
+
+ public abstract BasicAuthJcsmpSessionService build();
}
+ @Nullable private transient JCSMPSession jcsmpSession;
+ @Nullable private transient MessageReceiver messageReceiver;
+ @Nullable private transient MessageProducer messageProducer;
+ private final java.util.Queue publishedResultsQueue =
+ new ConcurrentLinkedQueue<>();
+ private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
+
@Override
public void connect() {
retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
@@ -79,6 +102,9 @@ public void close() {
if (messageReceiver != null) {
messageReceiver.close();
}
+ if (messageProducer != null) {
+ messageProducer.close();
+ }
if (!isClosed()) {
checkStateNotNull(jcsmpSession).closeSession();
}
@@ -88,24 +114,64 @@ public void close() {
}
@Override
- public MessageReceiver createReceiver() {
- this.messageReceiver =
- retryCallableManager.retryCallable(
- this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
+ public MessageReceiver getReceiver() {
+ if (this.messageReceiver == null) {
+ this.messageReceiver =
+ retryCallableManager.retryCallable(
+ this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
+ }
return this.messageReceiver;
}
+ @Override
+ public MessageProducer getInitializedProducer(SubmissionMode submissionMode) {
+ if (this.messageProducer == null || this.messageProducer.isClosed()) {
+ Callable create = () -> createXMLMessageProducer(submissionMode);
+ this.messageProducer =
+ retryCallableManager.retryCallable(create, ImmutableSet.of(JCSMPException.class));
+ }
+ return checkStateNotNull(this.messageProducer);
+ }
+
+ @Override
+ public java.util.Queue getPublishedResultsQueue() {
+ return publishedResultsQueue;
+ }
+
@Override
public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}
+ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
+ throws JCSMPException, IOException {
+
+ if (isClosed()) {
+ connectWriteSession(submissionMode);
+ }
+
+ @SuppressWarnings("nullness")
+ Callable initProducer =
+ () ->
+ Objects.requireNonNull(jcsmpSession)
+ .getMessageProducer(new PublishResultHandler(publishedResultsQueue));
+
+ XMLMessageProducer producer =
+ retryCallableManager.retryCallable(initProducer, ImmutableSet.of(JCSMPException.class));
+ if (producer == null) {
+ throw new IOException("SolaceIO.Write: Could not create producer, producer object is null");
+ }
+ return new SolaceMessageProducer(producer);
+ }
+
private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
}
- Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);
+ Queue queue =
+ JCSMPFactory.onlyInstance()
+ .createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set."));
ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
@@ -118,7 +184,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException
createFlowReceiver(jcsmpSession, flowProperties, endpointProperties));
}
throw new IOException(
- "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null.");
+ "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is"
+ + " null.");
}
// The `@SuppressWarning` is needed here, because the checkerframework reports an error for the
@@ -141,20 +208,33 @@ private int connectSession() throws JCSMPException {
return 0;
}
+ private int connectWriteSession(SubmissionMode mode) throws JCSMPException {
+ if (jcsmpSession == null) {
+ jcsmpSession = createWriteSessionObject(mode);
+ }
+ jcsmpSession.connect();
+ return 0;
+ }
+
private JCSMPSession createSessionObject() throws InvalidPropertiesException {
JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties());
return JCSMPFactory.onlyInstance().createSession(properties);
}
+ private JCSMPSession createWriteSessionObject(SubmissionMode mode)
+ throws InvalidPropertiesException {
+ return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode));
+ }
+
@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
- baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName);
+ baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());
baseProps.setProperty(
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
- baseProps.setProperty(JCSMPProperties.USERNAME, username);
- baseProps.setProperty(JCSMPProperties.PASSWORD, password);
- baseProps.setProperty(JCSMPProperties.HOST, host);
+ baseProps.setProperty(JCSMPProperties.USERNAME, username());
+ baseProps.setProperty(JCSMPProperties.PASSWORD, password());
+ baseProps.setProperty(JCSMPProperties.HOST, host());
return baseProps;
}
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
index 2084e61b7e38..199dcccee854 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.solace.broker;
import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import com.google.auto.value.AutoValue;
@@ -31,12 +30,16 @@
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
+ /** The host name or IP address of the Solace broker. Format: Host[:Port] */
public abstract String host();
+ /** The username to use for authentication. */
public abstract String username();
+ /** The password to use for authentication. */
public abstract String password();
+ /** The name of the VPN to connect to. */
public abstract String vpnName();
public static Builder builder() {
@@ -54,6 +57,7 @@ public abstract static class Builder {
/** Set Solace username. */
public abstract Builder username(String username);
+
/** Set Solace password. */
public abstract Builder password(String password);
@@ -65,11 +69,15 @@ public abstract static class Builder {
@Override
public SessionService create() {
- return new BasicAuthJcsmpSessionService(
- checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
- host(),
- username(),
- password(),
- vpnName());
+ BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder();
+ if (queue != null) {
+ builder = builder.queueName(queue.getName());
+ }
+ return builder
+ .host(host())
+ .username(username())
+ .password(password())
+ .vpnName(vpnName())
+ .build();
}
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
index dd87e1d75fa5..7f691b46be31 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java
@@ -117,7 +117,7 @@ public abstract static class Builder {
@Override
public SessionService create() {
- String password = null;
+ String password;
try {
password = retrieveSecret();
} catch (IOException e) {
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java
new file mode 100644
index 000000000000..8aa254b92cb1
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+/**
+ * Base class for publishing messages to a Solace broker.
+ *
+ * Implementations of this interface are responsible for managing the connection to the broker
+ * and for publishing messages to the broker.
+ */
+@Internal
+public interface MessageProducer {
+
+ /** Publishes a message to the broker. */
+ void publishSingleMessage(
+ Solace.Record msg,
+ Destination topicOrQueue,
+ boolean useCorrelationKeyLatency,
+ DeliveryMode deliveryMode);
+
+ /**
+ * Publishes a batch of messages to the broker.
+ *
+ *
The size of the batch cannot exceed 50 messages, this is a limitation of the Solace API.
+ *
+ *
It returns the number of messages written.
+ */
+ int publishBatch(
+ List records,
+ boolean useCorrelationKeyLatency,
+ SerializableFunction destinationFn,
+ DeliveryMode deliveryMode);
+
+ /** Returns {@literal true} if the message producer is closed, {@literal false} otherwise. */
+ boolean isClosed();
+
+ /** Closes the message producer. */
+ void close();
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducerUtils.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducerUtils.java
new file mode 100644
index 000000000000..dd4610910ff4
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducerUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPSendMultipleEntry;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+@Internal
+public class MessageProducerUtils {
+ // This is the batch limit supported by the send multiple JCSMP API method.
+ static final int SOLACE_BATCH_LIMIT = 50;
+
+ /**
+ * Create a {@link BytesXMLMessage} to be published in Solace.
+ *
+ * @param record The record to be published.
+ * @param useCorrelationKeyLatency Whether to use a complex key for tracking latency.
+ * @param deliveryMode The {@link DeliveryMode} used to publish the message.
+ * @return A {@link BytesXMLMessage} that can be sent to Solace "as is".
+ */
+ public static BytesXMLMessage createBytesXMLMessage(
+ Solace.Record record, boolean useCorrelationKeyLatency, DeliveryMode deliveryMode) {
+ JCSMPFactory jcsmpFactory = JCSMPFactory.onlyInstance();
+ BytesXMLMessage msg = jcsmpFactory.createBytesXMLMessage();
+ byte[] payload = record.getPayload();
+ msg.writeBytes(payload);
+
+ Long senderTimestamp = record.getSenderTimestamp();
+ if (senderTimestamp == null) {
+ senderTimestamp = System.currentTimeMillis();
+ }
+ msg.setSenderTimestamp(senderTimestamp);
+ msg.setDeliveryMode(deliveryMode);
+ if (useCorrelationKeyLatency) {
+ Solace.CorrelationKey key =
+ Solace.CorrelationKey.builder()
+ .setMessageId(record.getMessageId())
+ .setPublishMonotonicNanos(System.nanoTime())
+ .build();
+ msg.setCorrelationKey(key);
+ } else {
+ // Use only a string as correlation key
+ msg.setCorrelationKey(record.getMessageId());
+ }
+ msg.setApplicationMessageId(record.getMessageId());
+ return msg;
+ }
+
+ /**
+ * Create a {@link JCSMPSendMultipleEntry} array to be published in Solace. This can be used with
+ * `sendMultiple` to send all the messages in a single API call.
+ *
+ * The size of the list cannot be larger than 50 messages. This is a hard limit enforced by the
+ * Solace API.
+ *
+ * @param records A {@link List} of records to be published
+ * @param useCorrelationKeyLatency Whether to use a complex key for tracking latency.
+ * @param destinationFn A function that maps every record to its destination.
+ * @param deliveryMode The {@link DeliveryMode} used to publish the message.
+ * @return A {@link JCSMPSendMultipleEntry} array that can be sent to Solace "as is".
+ */
+ public static JCSMPSendMultipleEntry[] createJCSMPSendMultipleEntry(
+ List records,
+ boolean useCorrelationKeyLatency,
+ SerializableFunction destinationFn,
+ DeliveryMode deliveryMode) {
+ if (records.size() > SOLACE_BATCH_LIMIT) {
+ throw new RuntimeException(
+ String.format(
+ "SolaceIO.Write: Trying to create a batch of %d, but Solace supports a"
+ + " maximum of %d. The batch will likely be rejected by Solace.",
+ records.size(), SOLACE_BATCH_LIMIT));
+ }
+
+ JCSMPSendMultipleEntry[] entries = new JCSMPSendMultipleEntry[records.size()];
+ for (int i = 0; i < records.size(); i++) {
+ Solace.Record record = records.get(i);
+ JCSMPSendMultipleEntry entry =
+ JCSMPFactory.onlyInstance()
+ .createSendMultipleEntry(
+ createBytesXMLMessage(record, useCorrelationKeyLatency, deliveryMode),
+ destinationFn.apply(record));
+ entries[i] = entry;
+ }
+
+ return entries;
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java
new file mode 100644
index 000000000000..1153bfcb7a1c
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
+import java.util.Queue;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
+import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is required to handle callbacks from Solace, to find out if messages were actually
+ * published or there were any kind of error.
+ *
+ * This class is also used to calculate the latency of the publication. The correlation key
+ * contains the original timestamp of when the message was sent from the pipeline to Solace. The
+ * comparison of that value with the clock now, using a monotonic clock, is understood as the
+ * latency of the publication
+ */
+public final class PublishResultHandler implements JCSMPStreamingPublishCorrelatingEventHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PublishResultHandler.class);
+ private final Queue publishResultsQueue;
+ private final Counter batchesRejectedByBroker =
+ Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected");
+
+ public PublishResultHandler(Queue publishResultsQueue) {
+ this.publishResultsQueue = publishResultsQueue;
+ }
+
+ @Override
+ public void handleErrorEx(Object key, JCSMPException cause, long timestamp) {
+ processKey(key, false, cause);
+ }
+
+ @Override
+ public void responseReceivedEx(Object key) {
+ processKey(key, true, null);
+ }
+
+ private void processKey(Object key, boolean isPublished, @Nullable JCSMPException cause) {
+ PublishResult.Builder resultBuilder = PublishResult.builder();
+ String messageId;
+ if (key == null) {
+ messageId = "";
+ } else if (key instanceof Solace.CorrelationKey) {
+ messageId = ((Solace.CorrelationKey) key).getMessageId();
+ long latencyNanos = calculateLatency((Solace.CorrelationKey) key);
+ resultBuilder = resultBuilder.setLatencyNanos(latencyNanos);
+ } else {
+ messageId = key.toString();
+ }
+
+ resultBuilder = resultBuilder.setMessageId(messageId).setPublished(isPublished);
+ if (!isPublished) {
+ batchesRejectedByBroker.inc();
+ if (cause != null) {
+ resultBuilder = resultBuilder.setError(cause.getMessage());
+ } else {
+ resultBuilder = resultBuilder.setError("NULL - Not set by Solace");
+ }
+ } else if (cause != null) {
+ LOG.warn(
+ "Message with id {} is published but exception is populated. Ignoring exception",
+ messageId);
+ }
+
+ PublishResult publishResult = resultBuilder.build();
+ // Static reference, it receives all callbacks from all publications
+ // from all threads
+ publishResultsQueue.add(publishResult);
+ }
+
+ private static long calculateLatency(Solace.CorrelationKey key) {
+ long currentMillis = System.nanoTime();
+ long publishMillis = key.getPublishMonotonicNanos();
+ return currentMillis - publishMillis;
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
index aed700a71ded..84a876a9d0bc 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
@@ -19,7 +19,11 @@
import com.solacesystems.jcsmp.JCSMPProperties;
import java.io.Serializable;
+import java.util.Queue;
import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,21 +73,23 @@
* For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link
* BasicAuthJcsmpSessionServiceFactory}.
*
- *
For other situations, you need to extend this class. For instance:
+ *
For other situations, you need to extend this class and implement the `equals` method, so two
+ * instances of your class can be compared by value. We recommend using AutoValue for that. For
+ * instance:
*
*
{@code
+ * {@literal }@AutoValue
* public class MySessionService extends SessionService {
- * private final String authToken;
+ * abstract String authToken();
*
- * public MySessionService(String token) {
- * this.oauthToken = token;
- * ...
+ * public static MySessionService create(String authToken) {
+ * return new AutoValue_MySessionService(authToken);
* }
*
* {@literal }@Override
* public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
* baseProps.setProperty(JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2);
- * baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken);
+ * baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken());
* return props;
* }
*
@@ -101,6 +107,7 @@ public abstract class SessionService implements Serializable {
public static final String DEFAULT_VPN_NAME = "default";
+ private static final int TESTING_PUB_ACK_WINDOW = 1;
private static final int STREAMING_PUB_ACK_WINDOW = 50;
private static final int BATCHED_PUB_ACK_WINDOW = 255;
@@ -121,10 +128,25 @@ public abstract class SessionService implements Serializable {
public abstract boolean isClosed();
/**
- * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is
- * created from the session instance.
+ * Returns a MessageReceiver object for receiving messages from Solace. If it is the first time
+ * this method is used, the receiver is created from the session instance, otherwise it returns
+ * the receiver created initially.
*/
- public abstract MessageReceiver createReceiver();
+ public abstract MessageReceiver getReceiver();
+
+ /**
+ * Returns a MessageProducer object for publishing messages to Solace. If it is the first time
+ * this method is used, the producer is created from the session instance, otherwise it returns
+ * the producer created initially.
+ */
+ public abstract MessageProducer getInitializedProducer(SubmissionMode mode);
+
+ /**
+ * Returns the {@link Queue} instance associated with this session, with the
+ * asynchronously received callbacks from Solace for message publications. The queue
+ * implementation has to be thread-safe for production use-cases.
+ */
+ public abstract Queue getPublishedResultsQueue();
/**
* Override this method and provide your specific properties, including all those related to
@@ -147,6 +169,20 @@ public abstract class SessionService implements Serializable {
*/
public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties);
+ /**
+ * You need to override this method to be able to compare these objects by value. We recommend
+ * using AutoValue for that.
+ */
+ @Override
+ public abstract boolean equals(@Nullable Object other);
+
+ /**
+ * You need to override this method to be able to compare these objects by value. We recommend
+ * using AutoValue for that.
+ */
+ @Override
+ public abstract int hashCode();
+
/**
* This method will be called by the write connector when a new session is started.
*
@@ -186,50 +222,80 @@ private static JCSMPProperties overrideConnectorProperties(
// received from Solace. A value of 1 will have the lowest latency, but a very low
// throughput and a monumental backpressure.
- // This controls how the messages are sent to Solace
- if (mode == SolaceIO.SubmissionMode.HIGHER_THROUGHPUT) {
- // Create a parallel thread and a queue to send the messages
+ // Retrieve current values of the properties
+ Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
+ Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
- Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
- if (msgCbProp != null && msgCbProp) {
- LOG.warn(
- "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false since"
- + " HIGHER_THROUGHPUT mode was selected");
- }
+ switch (mode) {
+ case HIGHER_THROUGHPUT:
+ // Check if it was set by user, show override warning
+ if (msgCbProp != null && msgCbProp) {
+ LOG.warn(
+ "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false since"
+ + " HIGHER_THROUGHPUT mode was selected");
+ }
+ if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) {
+ LOG.warn(
+ String.format(
+ "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+ + " HIGHER_THROUGHPUT mode was selected",
+ BATCHED_PUB_ACK_WINDOW));
+ }
- props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
+ // Override the properties
+ // Use a dedicated thread for callbacks, increase the ack window size
+ props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
+ props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, BATCHED_PUB_ACK_WINDOW);
+ LOG.info(
+ "SolaceIO.Write: Using HIGHER_THROUGHPUT mode, MESSAGE_CALLBACK_ON_REACTOR is FALSE,"
+ + " PUB_ACK_WINDOW_SIZE is {}",
+ BATCHED_PUB_ACK_WINDOW);
+ break;
+ case LOWER_LATENCY:
+ // Check if it was set by user, show override warning
+ if (msgCbProp != null && !msgCbProp) {
+ LOG.warn(
+ "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true since"
+ + " LOWER_LATENCY mode was selected");
+ }
- Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
- if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) {
- LOG.warn(
- String.format(
- "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
- + " HIGHER_THROUGHPUT mode was selected",
- BATCHED_PUB_ACK_WINDOW));
- }
- props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, BATCHED_PUB_ACK_WINDOW);
- } else {
- // Send from the same thread where the produced is being called. This offers the lowest
- // latency, but a low throughput too.
- Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
- if (msgCbProp != null && !msgCbProp) {
- LOG.warn(
- "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true since"
- + " LOWER_LATENCY mode was selected");
- }
+ if ((ackWindowSize != null && ackWindowSize != STREAMING_PUB_ACK_WINDOW)) {
+ LOG.warn(
+ String.format(
+ "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
+ + " LOWER_LATENCY mode was selected",
+ STREAMING_PUB_ACK_WINDOW));
+ }
- props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+ // Override the properties
+ // Send from the same thread where the produced is being called. This offers the lowest
+ // latency, but a low throughput too.
+ props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+ props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, STREAMING_PUB_ACK_WINDOW);
+ LOG.info(
+ "SolaceIO.Write: Using LOWER_LATENCY mode, MESSAGE_CALLBACK_ON_REACTOR is TRUE,"
+ + " PUB_ACK_WINDOW_SIZE is {}",
+ STREAMING_PUB_ACK_WINDOW);
- Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
- if ((ackWindowSize != null && ackWindowSize != STREAMING_PUB_ACK_WINDOW)) {
+ break;
+ case CUSTOM:
+ LOG.info(
+ " SolaceIO.Write: Using the custom JCSMP properties set by the user. No property has"
+ + " been overridden by the connector.");
+ break;
+ case TESTING:
LOG.warn(
- String.format(
- "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
- + " LOWER_LATENCY mode was selected",
- STREAMING_PUB_ACK_WINDOW));
- }
-
- props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, STREAMING_PUB_ACK_WINDOW);
+ "SolaceIO.Write: Overriding JCSMP properties for testing. **IF THIS IS AN"
+ + " ACTUAL PIPELINE, CHANGE THE SUBMISSION MODE TO HIGHER_THROUGHPUT "
+ + "OR LOWER_LATENCY.**");
+ // Minimize multi-threading for testing
+ props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+ props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, TESTING_PUB_ACK_WINDOW);
+ break;
+ default:
+ LOG.error(
+ "SolaceIO.Write: no submission mode is selected. Set the submission mode to"
+ + " HIGHER_THROUGHPUT or LOWER_LATENCY;");
}
return props;
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index 027de2cff134..bd1f3c23694d 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -19,11 +19,40 @@
import com.solacesystems.jcsmp.Queue;
import java.io.Serializable;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
- * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
- * queue property and mandates the implementation of a create() method in concrete subclasses.
+ * This abstract class serves as a blueprint for creating `SessionServiceFactory` objects. It
+ * introduces a queue property and mandates the implementation of a create() method in concrete
+ * subclasses.
+ *
+ * For basic authentication, use {@link BasicAuthJcsmpSessionServiceFactory}.
+ *
+ *
For other situations, you need to extend this class. Classes extending from this abstract
+ * class must implement the `equals` method so two instances can be compared by value, and not by
+ * reference. We recommend using AutoValue for that.
+ *
+ *
{@code
+ * {@literal @}AutoValue
+ * public abstract class MyFactory implements SessionServiceClientFactory {
+ *
+ * abstract String value1();
+ *
+ * abstract String value2();
+ *
+ * public static MyFactory create(String value1, String value2) {
+ * return new AutoValue_MyFactory.Builder(value1, value2);
+ * }
+ *
+ * ...
+ *
+ * {@literal @}Override
+ * public SessionService create() {
+ * ...
+ * }
+ * }
+ * }
*/
public abstract class SessionServiceFactory implements Serializable {
/**
@@ -34,12 +63,32 @@ public abstract class SessionServiceFactory implements Serializable {
*/
@Nullable Queue queue;
+ /**
+ * The write submission mode. This is set when the writers are created. This property is used only
+ * by the write connector.
+ */
+ @Nullable SubmissionMode submissionMode;
+
/**
* This is the core method that subclasses must implement. It defines how to construct and return
* a SessionService object.
*/
public abstract SessionService create();
+ /**
+ * You need to override this method to be able to compare these objects by value. We recommend
+ * using AutoValue for that.
+ */
+ @Override
+ public abstract boolean equals(@Nullable Object other);
+
+ /**
+ * You need to override this method to be able to compare these objects by value. We recommend
+ * using AutoValue for that.
+ */
+ @Override
+ public abstract int hashCode();
+
/**
* This method is called in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
@@ -48,4 +97,15 @@ public abstract class SessionServiceFactory implements Serializable {
public void setQueue(Queue queue) {
this.queue = queue;
}
+
+ /**
+ * Called by the write connector to set the submission mode used to create the message producers.
+ */
+ public void setSubmissionMode(SubmissionMode submissionMode) {
+ this.submissionMode = submissionMode;
+ }
+
+ public @Nullable SubmissionMode getSubmissionMode() {
+ return submissionMode;
+ }
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java
new file mode 100644
index 000000000000..b3806b5afae9
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.apache.beam.sdk.io.solace.broker.MessageProducerUtils.createBytesXMLMessage;
+import static org.apache.beam.sdk.io.solace.broker.MessageProducerUtils.createJCSMPSendMultipleEntry;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPSendMultipleEntry;
+import com.solacesystems.jcsmp.XMLMessageProducer;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+@Internal
+public class SolaceMessageProducer implements MessageProducer {
+
+ private final XMLMessageProducer producer;
+ private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
+
+ public SolaceMessageProducer(XMLMessageProducer producer) {
+ this.producer = producer;
+ }
+
+ @Override
+ public void publishSingleMessage(
+ Solace.Record record,
+ Destination topicOrQueue,
+ boolean useCorrelationKeyLatency,
+ DeliveryMode deliveryMode) {
+ BytesXMLMessage msg = createBytesXMLMessage(record, useCorrelationKeyLatency, deliveryMode);
+ Callable publish =
+ () -> {
+ producer.send(msg, topicOrQueue);
+ return 0;
+ };
+
+ retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class));
+ }
+
+ @Override
+ public int publishBatch(
+ List records,
+ boolean useCorrelationKeyLatency,
+ SerializableFunction destinationFn,
+ DeliveryMode deliveryMode) {
+ JCSMPSendMultipleEntry[] batch =
+ createJCSMPSendMultipleEntry(
+ records, useCorrelationKeyLatency, destinationFn, deliveryMode);
+ Callable publish = () -> producer.sendMultiple(batch, 0, batch.length, 0);
+ return retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class));
+ }
+
+ @Override
+ public boolean isClosed() {
+ return producer == null || producer.isClosed();
+ }
+
+ @Override
+ public void close() {
+ if (!isClosed()) {
+ this.producer.close();
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
index 00b94b5b9ea9..21274237f46a 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
@@ -21,7 +21,6 @@
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
@@ -52,6 +51,7 @@ public String getName() {
return name;
}
}
+
/** Represents a Solace topic. */
public static class Topic {
private final String name;
@@ -68,6 +68,7 @@ public String getName() {
return name;
}
}
+
/** Represents a Solace destination type. */
public enum DestinationType {
TOPIC,
@@ -93,17 +94,17 @@ public abstract static class Destination {
*/
public abstract DestinationType getType();
- static Builder builder() {
+ public static Builder builder() {
return new AutoValue_Solace_Destination.Builder();
}
@AutoValue.Builder
- abstract static class Builder {
- abstract Builder setName(String name);
+ public abstract static class Builder {
+ public abstract Builder setName(String name);
- abstract Builder setType(DestinationType type);
+ public abstract Builder setType(DestinationType type);
- abstract Destination build();
+ public abstract Destination build();
}
}
@@ -120,17 +121,19 @@ public abstract static class Record {
* @return The message ID, or null if not available.
*/
@SchemaFieldNumber("0")
- public abstract @Nullable String getMessageId();
+ public abstract String getMessageId();
/**
- * Gets the payload of the message as a ByteString.
+ * Gets the payload of the message as a byte array.
*
* Mapped from {@link BytesXMLMessage#getBytes()}
*
* @return The message payload.
*/
+ @SuppressWarnings("mutable")
@SchemaFieldNumber("1")
- public abstract ByteBuffer getPayload();
+ public abstract byte[] getPayload();
+
/**
* Gets the destination (topic or queue) to which the message was sent.
*
@@ -192,7 +195,7 @@ public abstract static class Record {
* @return The timestamp.
*/
@SchemaFieldNumber("7")
- public abstract long getReceiveTimestamp();
+ public abstract @Nullable Long getReceiveTimestamp();
/**
* Gets the timestamp (in milliseconds since the Unix epoch) when the message was sent by the
@@ -241,55 +244,62 @@ public abstract static class Record {
public abstract @Nullable String getReplicationGroupMessageId();
/**
- * Gets the attachment data of the message as a ByteString, if any. This might represent files
+ * Gets the attachment data of the message as a byte array, if any. This might represent files
* or other binary content associated with the message.
*
*
Mapped from {@link BytesXMLMessage#getAttachmentByteBuffer()}
*
- * @return The attachment data, or an empty ByteString if no attachment is present.
+ * @return The attachment data, or an empty byte array if no attachment is present.
*/
+ @SuppressWarnings("mutable")
@SchemaFieldNumber("12")
- public abstract ByteBuffer getAttachmentBytes();
+ public abstract byte[] getAttachmentBytes();
- static Builder builder() {
- return new AutoValue_Solace_Record.Builder();
+ public static Builder builder() {
+ return new AutoValue_Solace_Record.Builder()
+ .setExpiration(0L)
+ .setPriority(-1)
+ .setRedelivered(false)
+ .setTimeToLive(0)
+ .setAttachmentBytes(new byte[0]);
}
@AutoValue.Builder
- abstract static class Builder {
- abstract Builder setMessageId(@Nullable String messageId);
+ public abstract static class Builder {
+ public abstract Builder setMessageId(String messageId);
- abstract Builder setPayload(ByteBuffer payload);
+ public abstract Builder setPayload(byte[] payload);
- abstract Builder setDestination(@Nullable Destination destination);
+ public abstract Builder setDestination(@Nullable Destination destination);
- abstract Builder setExpiration(long expiration);
+ public abstract Builder setExpiration(long expiration);
- abstract Builder setPriority(int priority);
+ public abstract Builder setPriority(int priority);
- abstract Builder setRedelivered(boolean redelivered);
+ public abstract Builder setRedelivered(boolean redelivered);
- abstract Builder setReplyTo(@Nullable Destination replyTo);
+ public abstract Builder setReplyTo(@Nullable Destination replyTo);
- abstract Builder setReceiveTimestamp(long receiveTimestamp);
+ public abstract Builder setReceiveTimestamp(@Nullable Long receiveTimestamp);
- abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp);
+ public abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp);
- abstract Builder setSequenceNumber(@Nullable Long sequenceNumber);
+ public abstract Builder setSequenceNumber(@Nullable Long sequenceNumber);
- abstract Builder setTimeToLive(long timeToLive);
+ public abstract Builder setTimeToLive(long timeToLive);
- abstract Builder setReplicationGroupMessageId(@Nullable String replicationGroupMessageId);
+ public abstract Builder setReplicationGroupMessageId(
+ @Nullable String replicationGroupMessageId);
- abstract Builder setAttachmentBytes(ByteBuffer attachmentBytes);
+ public abstract Builder setAttachmentBytes(byte[] attachmentBytes);
- abstract Record build();
+ public abstract Record build();
}
}
/**
* The result of writing a message to Solace. This will be returned by the {@link
- * com.google.cloud.dataflow.dce.io.solace.SolaceIO.Write} connector.
+ * org.apache.beam.sdk.io.solace.SolaceIO.Write} connector.
*
*
This class provides a builder to create instances, but you will probably not need it. The
* write connector will create and return instances of {@link Solace.PublishResult}.
@@ -311,12 +321,12 @@ public abstract static class PublishResult {
public abstract Boolean getPublished();
/**
- * The publishing latency in milliseconds. This is the difference between the time the message
+ * The publishing latency in nanoseconds. This is the difference between the time the message
* was created, and the time the message was published. It is only available if the {@link
- * CorrelationKey} class is used as correlation key of the messages.
+ * CorrelationKey} class is used as correlation key of the messages, and null otherwise.
*/
@SchemaFieldNumber("2")
- public abstract @Nullable Long getLatencyMilliseconds();
+ public abstract @Nullable Long getLatencyNanos();
/** The error details if the message could not be published. */
@SchemaFieldNumber("3")
@@ -332,7 +342,7 @@ public abstract static class Builder {
public abstract Builder setPublished(Boolean published);
- public abstract Builder setLatencyMilliseconds(Long latencyMs);
+ public abstract Builder setLatencyNanos(Long latencyNanos);
public abstract Builder setError(String error);
@@ -354,7 +364,7 @@ public abstract static class CorrelationKey {
public abstract String getMessageId();
@SchemaFieldNumber("1")
- public abstract long getPublishMonotonicMillis();
+ public abstract long getPublishMonotonicNanos();
public static Builder builder() {
return new AutoValue_Solace_CorrelationKey.Builder();
@@ -364,7 +374,7 @@ public static Builder builder() {
public abstract static class Builder {
public abstract Builder setMessageId(String messageId);
- public abstract Builder setPublishMonotonicMillis(long millis);
+ public abstract Builder setPublishMonotonicNanos(long nanos);
public abstract CorrelationKey build();
}
@@ -414,7 +424,7 @@ public static class SolaceRecordMapper {
Destination destination = getDestination(msg.getCorrelationId(), msg.getDestination());
return Record.builder()
.setMessageId(msg.getApplicationMessageId())
- .setPayload(ByteBuffer.wrap(payloadBytesStream.toByteArray()))
+ .setPayload(payloadBytesStream.toByteArray())
.setDestination(destination)
.setExpiration(msg.getExpiration())
.setPriority(msg.getPriority())
@@ -428,7 +438,7 @@ public static class SolaceRecordMapper {
msg.getReplicationGroupMessageId() != null
? msg.getReplicationGroupMessageId().toString()
: null)
- .setAttachmentBytes(ByteBuffer.wrap(attachmentBytesStream.toByteArray()))
+ .setAttachmentBytes(attachmentBytesStream.toByteArray())
.build();
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
index c18a9d110b2a..a421970370da 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -29,7 +29,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
-import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
import org.apache.beam.sdk.io.solace.broker.SempClient;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -49,7 +48,6 @@ class UnboundedSolaceReader extends UnboundedReader {
private final SempClient sempClient;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
- private @Nullable MessageReceiver messageReceiver;
private @Nullable SessionService sessionService;
AtomicBoolean active = new AtomicBoolean(true);
@@ -72,7 +70,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) {
@Override
public boolean start() {
populateSession();
- populateMessageConsumer();
+ checkNotNull(sessionService).getReceiver().start();
return advance();
}
@@ -85,22 +83,11 @@ public void populateSession() {
}
}
- private void populateMessageConsumer() {
- if (messageReceiver == null) {
- messageReceiver = checkNotNull(sessionService).createReceiver();
- messageReceiver.start();
- }
- MessageReceiver receiver = checkNotNull(messageReceiver);
- if (receiver.isClosed()) {
- receiver.start();
- }
- }
-
@Override
public boolean advance() {
BytesXMLMessage receivedXmlMessage;
try {
- receivedXmlMessage = checkNotNull(messageReceiver).receive();
+ receivedXmlMessage = checkNotNull(sessionService).getReceiver().receive();
} catch (IOException e) {
LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e);
return false;
@@ -125,7 +112,7 @@ public void close() {
@Override
public Instant getWatermark() {
// should be only used by a test receiver
- if (checkNotNull(messageReceiver).isEOF()) {
+ if (checkNotNull(sessionService).getReceiver().isEOF()) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
return watermarkPolicy.getWatermark();
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java
new file mode 100644
index 000000000000..12d8a8507d8a
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/AddShardKeyDoFn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * This class a pseudo-key with a given cardinality. The downstream steps will use state {@literal
+ * &} timers to distribute the data and control for the number of parallel workers used for writing.
+ */
+@Internal
+public class AddShardKeyDoFn extends DoFn> {
+ private final int shardCount;
+ private int shardKey;
+
+ public AddShardKeyDoFn(int shardCount) {
+ this.shardCount = shardCount;
+ shardKey = -1;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element Solace.Record record, OutputReceiver> c) {
+ shardKey = (shardKey + 1) % shardCount;
+ c.output(KV.of(shardKey, record));
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/RecordToPublishResultDoFn.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/RecordToPublishResultDoFn.java
new file mode 100644
index 000000000000..4be5b0a014b3
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/RecordToPublishResultDoFn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * This class just transforms to PublishResult to be able to capture the windowing with the right
+ * strategy. The output is not used for anything else.
+ */
+@Internal
+public class RecordToPublishResultDoFn extends DoFn {
+ @ProcessElement
+ public void processElement(
+ @Element Solace.Record record, OutputReceiver receiver) {
+ Solace.PublishResult result =
+ Solace.PublishResult.builder()
+ .setPublished(true)
+ .setMessageId(record.getMessageId())
+ .setLatencyNanos(0L)
+ .build();
+ receiver.output(result);
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java
index 6c37f879ae7f..d9c37326f83f 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceOutput.java
@@ -22,6 +22,7 @@
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -31,50 +32,33 @@
import org.checkerframework.checker.nullness.qual.Nullable;
/**
- * The {@link SolaceIO.Write} transform's output return this type, containing both the successful
- * publishes ({@link #getSuccessfulPublish()}) and the failed publishes ({@link
- * #getFailedPublish()}).
+ * The {@link SolaceIO.Write} transform's output return this type, containing the successful
+ * publishes ({@link #getSuccessfulPublish()}). To access failed records, configure the connector
+ * with {@link SolaceIO.Write#withErrorHandler(ErrorHandler)}.
*
* The streaming writer with DIRECT messages does not return anything, and the output {@link
- * PCollection}s will be equal to null.
+ * PCollection} will be equal to null.
*/
public final class SolaceOutput implements POutput {
private final Pipeline pipeline;
- private final TupleTag failedPublishTag;
private final TupleTag successfulPublishTag;
- private final @Nullable PCollection failedPublish;
private final @Nullable PCollection successfulPublish;
- public @Nullable PCollection getFailedPublish() {
- return failedPublish;
- }
-
public @Nullable PCollection getSuccessfulPublish() {
return successfulPublish;
}
public static SolaceOutput in(
- Pipeline pipeline,
- @Nullable PCollection failedPublish,
- @Nullable PCollection successfulPublish) {
- return new SolaceOutput(
- pipeline,
- SolaceIO.Write.FAILED_PUBLISH_TAG,
- SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG,
- failedPublish,
- successfulPublish);
+ Pipeline pipeline, @Nullable PCollection successfulPublish) {
+ return new SolaceOutput(pipeline, SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG, successfulPublish);
}
private SolaceOutput(
Pipeline pipeline,
- TupleTag failedPublishTag,
TupleTag successfulPublishTag,
- @Nullable PCollection failedPublish,
@Nullable PCollection successfulPublish) {
this.pipeline = pipeline;
- this.failedPublishTag = failedPublishTag;
this.successfulPublishTag = successfulPublishTag;
- this.failedPublish = failedPublish;
this.successfulPublish = successfulPublish;
}
@@ -87,10 +71,6 @@ public Pipeline getPipeline() {
public Map, PValue> expand() {
ImmutableMap.Builder, PValue> builder = ImmutableMap., PValue>builder();
- if (failedPublish != null) {
- builder.put(failedPublishTag, failedPublish);
- }
-
if (successfulPublish != null) {
builder.put(successfulPublishTag, successfulPublish);
}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java
new file mode 100644
index 000000000000..109010231d17
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import static org.apache.beam.sdk.io.solace.SolaceIO.DEFAULT_WRITER_CLIENTS_PER_WORKER;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+
+/**
+ * All the writer threads belonging to the same factory share the same instance of this class, to
+ * control for the number of clients that are connected to Solace, and minimize problems with quotas
+ * and limits.
+ *
+ * This class maintains a map of all the session open in a worker, and control the size of that
+ * map, to avoid creating more sessions than Solace could handle.
+ *
+ *
This class is thread-safe and creates a pool of producers per SessionServiceFactory. If there
+ * is only a Write transform in the pipeline, this is effectively a singleton. If there are more
+ * than one, each {@link SessionServiceFactory} instance keeps their own pool of producers.
+ */
+final class SolaceWriteSessionsHandler {
+
+ private static final ConcurrentHashMap sessionsMap =
+ new ConcurrentHashMap<>(DEFAULT_WRITER_CLIENTS_PER_WORKER);
+
+ public static SessionService getSessionServiceWithProducer(
+ int producerIndex, SessionServiceFactory sessionServiceFactory, UUID writerTransformUuid) {
+ SessionConfigurationIndex key =
+ SessionConfigurationIndex.builder()
+ .producerIndex(producerIndex)
+ .sessionServiceFactory(sessionServiceFactory)
+ .writerTransformUuid(writerTransformUuid)
+ .build();
+ return sessionsMap.computeIfAbsent(
+ key, SolaceWriteSessionsHandler::createSessionAndStartProducer);
+ }
+
+ private static SessionService createSessionAndStartProducer(SessionConfigurationIndex key) {
+ SessionServiceFactory factory = key.sessionServiceFactory();
+ SessionService sessionService = factory.create();
+ // Start the producer now that the initialization is locked for other threads
+ SubmissionMode mode = factory.getSubmissionMode();
+ checkStateNotNull(
+ mode,
+ "SolaceIO.Write: Submission mode is not set. You need to set it to create write sessions.");
+ sessionService.getInitializedProducer(mode);
+ return sessionService;
+ }
+
+ /** Disconnect all the sessions from Solace, and clear the corresponding state. */
+ public static void disconnectFromSolace(
+ SessionServiceFactory factory, int producersCardinality, UUID writerTransformUuid) {
+ for (int i = 0; i < producersCardinality; i++) {
+ SessionConfigurationIndex key =
+ SessionConfigurationIndex.builder()
+ .producerIndex(i)
+ .sessionServiceFactory(factory)
+ .writerTransformUuid(writerTransformUuid)
+ .build();
+
+ SessionService sessionService = sessionsMap.remove(key);
+ if (sessionService != null) {
+ sessionService.close();
+ }
+ }
+ }
+
+ @AutoValue
+ abstract static class SessionConfigurationIndex {
+ abstract int producerIndex();
+
+ abstract SessionServiceFactory sessionServiceFactory();
+
+ abstract UUID writerTransformUuid();
+
+ static Builder builder() {
+ return new AutoValue_SolaceWriteSessionsHandler_SessionConfigurationIndex.Builder();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder producerIndex(int producerIndex);
+
+ abstract Builder sessionServiceFactory(SessionServiceFactory sessionServiceFactory);
+
+ abstract Builder writerTransformUuid(UUID writerTransformUuid);
+
+ abstract SessionConfigurationIndex build();
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java
new file mode 100644
index 000000000000..dd4f81eeb082
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.Record;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This DoFn is the responsible for writing to Solace in batch mode (holding up any messages), and
+ * emit the corresponding output (success or fail; only for persistent messages), so the
+ * SolaceIO.Write connector can be composed with other subsequent transforms in the pipeline.
+ *
+ * The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be
+ * reused across different threads (if the number of threads is higher than the number of sessions,
+ * which is probably the most common case).
+ *
+ *
The producer uses the JCSMP send multiple mode to publish a batch of messages together with a
+ * single API call. The acks from this publication are also processed in batch, and returned as the
+ * output of the DoFn.
+ *
+ *
The batch size is 50, and this is currently the maximum value supported by Solace.
+ *
+ *
There are no acks if the delivery mode is set to DIRECT.
+ *
+ *
This writer DoFn offers higher throughput than {@link UnboundedStreamingSolaceWriter} but also
+ * higher latency.
+ */
+@Internal
+public final class UnboundedBatchedSolaceWriter extends UnboundedSolaceWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedBatchedSolaceWriter.class);
+
+ private static final int ACKS_FLUSHING_INTERVAL_SECS = 10;
+
+ private final Counter sentToBroker =
+ Metrics.counter(UnboundedBatchedSolaceWriter.class, "msgs_sent_to_broker");
+
+ private final Counter batchesRejectedByBroker =
+ Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected");
+
+ // State variables are never explicitly "used"
+ @SuppressWarnings("UnusedVariable")
+ @TimerId("bundle_flusher")
+ private final TimerSpec bundleFlusherTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ public UnboundedBatchedSolaceWriter(
+ SerializableFunction destinationFn,
+ SessionServiceFactory sessionServiceFactory,
+ DeliveryMode deliveryMode,
+ SubmissionMode submissionMode,
+ int producersMapCardinality,
+ boolean publishLatencyMetrics) {
+ super(
+ destinationFn,
+ sessionServiceFactory,
+ deliveryMode,
+ submissionMode,
+ producersMapCardinality,
+ publishLatencyMetrics);
+ }
+
+ // The state variable is here just to force a shuffling with a certain cardinality
+ @ProcessElement
+ public void processElement(
+ @Element KV element,
+ @TimerId("bundle_flusher") Timer bundleFlusherTimer,
+ @Timestamp Instant timestamp) {
+
+ setCurrentBundleTimestamp(timestamp);
+
+ Solace.Record record = element.getValue();
+
+ if (record == null) {
+ LOG.error(
+ "SolaceIO.Write: Found null record with key {}. Ignoring record.", element.getKey());
+ } else {
+ addToCurrentBundle(record);
+ // Extend timer for bundle flushing
+ bundleFlusherTimer
+ .offset(Duration.standardSeconds(ACKS_FLUSHING_INTERVAL_SECS))
+ .setRelative();
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context) throws IOException {
+ // Take messages in groups of 50 (if there are enough messages)
+ List currentBundle = getCurrentBundle();
+ for (int i = 0; i < currentBundle.size(); i += SOLACE_BATCH_LIMIT) {
+ int toIndex = Math.min(i + SOLACE_BATCH_LIMIT, currentBundle.size());
+ List batch = currentBundle.subList(i, toIndex);
+ if (batch.isEmpty()) {
+ continue;
+ }
+ publishBatch(batch);
+ }
+ getCurrentBundle().clear();
+
+ publishResults(BeamContextWrapper.of(context));
+ }
+
+ @OnTimer("bundle_flusher")
+ public void flushBundle(OnTimerContext context) throws IOException {
+ publishResults(BeamContextWrapper.of(context));
+ }
+
+ private void publishBatch(List records) {
+ try {
+ int entriesPublished =
+ solaceSessionServiceWithProducer()
+ .getInitializedProducer(getSubmissionMode())
+ .publishBatch(
+ records, shouldPublishLatencyMetrics(), getDestinationFn(), getDeliveryMode());
+ sentToBroker.inc(entriesPublished);
+ } catch (Exception e) {
+ batchesRejectedByBroker.inc();
+ Solace.PublishResult errorPublish =
+ Solace.PublishResult.builder()
+ .setPublished(false)
+ .setMessageId(String.format("BATCH_OF_%d_ENTRIES", records.size()))
+ .setError(
+ String.format(
+ "Batch could not be published after several" + " retries. Error: %s",
+ e.getMessage()))
+ .setLatencyNanos(System.nanoTime())
+ .build();
+ solaceSessionServiceWithProducer().getPublishedResultsQueue().add(errorPublish);
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java
new file mode 100644
index 000000000000..1c98113c2416
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import static org.apache.beam.sdk.io.solace.SolaceIO.Write.FAILED_PUBLISH_TAG;
+import static org.apache.beam.sdk.io.solace.SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPSendMultipleEntry;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
+import org.apache.beam.sdk.io.solace.data.Solace.Record;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This DoFn encapsulates common code used both for the {@link UnboundedBatchedSolaceWriter} and
+ * {@link UnboundedStreamingSolaceWriter}.
+ */
+@Internal
+public abstract class UnboundedSolaceWriter
+ extends DoFn, Solace.PublishResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceWriter.class);
+
+ // This is the batch limit supported by the send multiple JCSMP API method.
+ static final int SOLACE_BATCH_LIMIT = 50;
+ private final Distribution latencyPublish =
+ Metrics.distribution(SolaceIO.Write.class, "latency_publish_ms");
+
+ private final Distribution latencyErrors =
+ Metrics.distribution(SolaceIO.Write.class, "latency_failed_ms");
+
+ private final SerializableFunction destinationFn;
+
+ private final SessionServiceFactory sessionServiceFactory;
+ private final DeliveryMode deliveryMode;
+ private final SubmissionMode submissionMode;
+ private final int producersMapCardinality;
+ private final boolean publishLatencyMetrics;
+ private static final AtomicInteger bundleProducerIndexCounter = new AtomicInteger();
+ private int currentBundleProducerIndex = 0;
+
+ private final List batchToEmit;
+
+ private @Nullable Instant bundleTimestamp;
+
+ final UUID writerTransformUuid = UUID.randomUUID();
+
+ public UnboundedSolaceWriter(
+ SerializableFunction destinationFn,
+ SessionServiceFactory sessionServiceFactory,
+ DeliveryMode deliveryMode,
+ SubmissionMode submissionMode,
+ int producersMapCardinality,
+ boolean publishLatencyMetrics) {
+ this.destinationFn = destinationFn;
+ this.sessionServiceFactory = sessionServiceFactory;
+ // Make sure that we set the submission mode now that we know which mode has been set by the
+ // user.
+ this.sessionServiceFactory.setSubmissionMode(submissionMode);
+ this.deliveryMode = deliveryMode;
+ this.submissionMode = submissionMode;
+ this.producersMapCardinality = producersMapCardinality;
+ this.publishLatencyMetrics = publishLatencyMetrics;
+ this.batchToEmit = new ArrayList<>();
+ }
+
+ @Teardown
+ public void teardown() {
+ SolaceWriteSessionsHandler.disconnectFromSolace(
+ sessionServiceFactory, producersMapCardinality, writerTransformUuid);
+ }
+
+ public void updateProducerIndex() {
+ currentBundleProducerIndex =
+ bundleProducerIndexCounter.getAndIncrement() % producersMapCardinality;
+ }
+
+ @StartBundle
+ public void startBundle() {
+ // Pick a producer at random for this bundle, reuse for the whole bundle
+ updateProducerIndex();
+ batchToEmit.clear();
+ }
+
+ public SessionService solaceSessionServiceWithProducer() {
+ return SolaceWriteSessionsHandler.getSessionServiceWithProducer(
+ currentBundleProducerIndex, sessionServiceFactory, writerTransformUuid);
+ }
+
+ public void publishResults(BeamContextWrapper context) {
+ long sumPublish = 0;
+ long countPublish = 0;
+ long minPublish = Long.MAX_VALUE;
+ long maxPublish = 0;
+
+ long sumFailed = 0;
+ long countFailed = 0;
+ long minFailed = Long.MAX_VALUE;
+ long maxFailed = 0;
+
+ Queue publishResultsQueue =
+ solaceSessionServiceWithProducer().getPublishedResultsQueue();
+ Solace.PublishResult result = publishResultsQueue.poll();
+
+ if (result != null) {
+ if (getCurrentBundleTimestamp() == null) {
+ setCurrentBundleTimestamp(Instant.now());
+ }
+ }
+
+ while (result != null) {
+ Long latency = result.getLatencyNanos();
+
+ if (latency == null && shouldPublishLatencyMetrics()) {
+ LOG.error(
+ "SolaceIO.Write: Latency is null but user asked for latency metrics."
+ + " This may be a bug.");
+ }
+
+ if (latency != null) {
+ if (result.getPublished()) {
+ sumPublish += latency;
+ countPublish++;
+ minPublish = Math.min(minPublish, latency);
+ maxPublish = Math.max(maxPublish, latency);
+ } else {
+ sumFailed += latency;
+ countFailed++;
+ minFailed = Math.min(minFailed, latency);
+ maxFailed = Math.max(maxFailed, latency);
+ }
+ }
+ if (result.getPublished()) {
+ context.output(
+ SUCCESSFUL_PUBLISH_TAG, result, getCurrentBundleTimestamp(), GlobalWindow.INSTANCE);
+ } else {
+ try {
+ BadRecord b =
+ BadRecord.fromExceptionInformation(
+ result,
+ null,
+ null,
+ Optional.ofNullable(result.getError()).orElse("SolaceIO.Write: unknown error."));
+ context.output(FAILED_PUBLISH_TAG, b, getCurrentBundleTimestamp(), GlobalWindow.INSTANCE);
+ } catch (IOException e) {
+ // ignore, the exception is thrown when the exception argument in the
+ // `BadRecord.fromExceptionInformation` is not null.
+ }
+ }
+
+ result = publishResultsQueue.poll();
+ }
+
+ if (shouldPublishLatencyMetrics()) {
+ // Report all latency value in milliseconds
+ if (countPublish > 0) {
+ getPublishLatencyMetric()
+ .update(
+ TimeUnit.NANOSECONDS.toMillis(sumPublish),
+ countPublish,
+ TimeUnit.NANOSECONDS.toMillis(minPublish),
+ TimeUnit.NANOSECONDS.toMillis(maxPublish));
+ }
+
+ if (countFailed > 0) {
+ getFailedLatencyMetric()
+ .update(
+ TimeUnit.NANOSECONDS.toMillis(sumFailed),
+ countFailed,
+ TimeUnit.NANOSECONDS.toMillis(minFailed),
+ TimeUnit.NANOSECONDS.toMillis(maxFailed));
+ }
+ }
+ }
+
+ public BytesXMLMessage createSingleMessage(
+ Solace.Record record, boolean useCorrelationKeyLatency) {
+ JCSMPFactory jcsmpFactory = JCSMPFactory.onlyInstance();
+ BytesXMLMessage msg = jcsmpFactory.createBytesXMLMessage();
+ byte[] payload = record.getPayload();
+ msg.writeBytes(payload);
+
+ Long senderTimestamp = record.getSenderTimestamp();
+ if (senderTimestamp == null) {
+ LOG.error(
+ "SolaceIO.Write: Record with id {} has no sender timestamp. Using current"
+ + " worker clock as timestamp.",
+ record.getMessageId());
+ senderTimestamp = System.currentTimeMillis();
+ }
+ msg.setSenderTimestamp(senderTimestamp);
+ msg.setDeliveryMode(getDeliveryMode());
+ if (useCorrelationKeyLatency) {
+ Solace.CorrelationKey key =
+ Solace.CorrelationKey.builder()
+ .setMessageId(record.getMessageId())
+ .setPublishMonotonicNanos(System.nanoTime())
+ .build();
+ msg.setCorrelationKey(key);
+ } else {
+ // Use only a string as correlation key
+ msg.setCorrelationKey(record.getMessageId());
+ }
+ msg.setApplicationMessageId(record.getMessageId());
+ return msg;
+ }
+
+ public JCSMPSendMultipleEntry[] createMessagesArray(
+ Iterable records, boolean useCorrelationKeyLatency) {
+ // Solace batch publishing only supports 50 elements max, so it is safe to convert to
+ // list here
+ ArrayList recordsList = Lists.newArrayList(records);
+ if (recordsList.size() > SOLACE_BATCH_LIMIT) {
+ LOG.error(
+ "SolaceIO.Write: Trying to create a batch of {}, but Solace supports a"
+ + " maximum of {}. The batch will likely be rejected by Solace.",
+ recordsList.size(),
+ SOLACE_BATCH_LIMIT);
+ }
+
+ JCSMPSendMultipleEntry[] entries = new JCSMPSendMultipleEntry[recordsList.size()];
+ for (int i = 0; i < recordsList.size(); i++) {
+ Solace.Record record = recordsList.get(i);
+ JCSMPSendMultipleEntry entry =
+ JCSMPFactory.onlyInstance()
+ .createSendMultipleEntry(
+ createSingleMessage(record, useCorrelationKeyLatency),
+ getDestinationFn().apply(record));
+ entries[i] = entry;
+ }
+
+ return entries;
+ }
+
+ public int getProducersMapCardinality() {
+ return producersMapCardinality;
+ }
+
+ public Distribution getPublishLatencyMetric() {
+ return latencyPublish;
+ }
+
+ public Distribution getFailedLatencyMetric() {
+ return latencyErrors;
+ }
+
+ public boolean shouldPublishLatencyMetrics() {
+ return publishLatencyMetrics;
+ }
+
+ public SerializableFunction getDestinationFn() {
+ return destinationFn;
+ }
+
+ public DeliveryMode getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ public SubmissionMode getSubmissionMode() {
+ return submissionMode;
+ }
+
+ public void addToCurrentBundle(Solace.Record record) {
+ batchToEmit.add(record);
+ }
+
+ public List getCurrentBundle() {
+ return batchToEmit;
+ }
+
+ public @Nullable Instant getCurrentBundleTimestamp() {
+ return bundleTimestamp;
+ }
+
+ public void setCurrentBundleTimestamp(Instant bundleTimestamp) {
+ if (this.bundleTimestamp == null || bundleTimestamp.isBefore(this.bundleTimestamp)) {
+ this.bundleTimestamp = bundleTimestamp;
+ }
+ }
+
+ /**
+ * Since we need to publish from on timer methods and finish bundle methods, we need a consistent
+ * way to handle both WindowedContext and FinishBundleContext.
+ */
+ static class BeamContextWrapper {
+ private @Nullable WindowedContext windowedContext;
+ private @Nullable FinishBundleContext finishBundleContext;
+
+ private BeamContextWrapper() {}
+
+ public static BeamContextWrapper of(WindowedContext windowedContext) {
+ BeamContextWrapper beamContextWrapper = new BeamContextWrapper();
+ beamContextWrapper.windowedContext = windowedContext;
+ return beamContextWrapper;
+ }
+
+ public static BeamContextWrapper of(FinishBundleContext finishBundleContext) {
+ BeamContextWrapper beamContextWrapper = new BeamContextWrapper();
+ beamContextWrapper.finishBundleContext = finishBundleContext;
+ return beamContextWrapper;
+ }
+
+ public void output(
+ TupleTag tag,
+ T output,
+ @Nullable Instant timestamp, // Not required for windowed context
+ @Nullable BoundedWindow window) { // Not required for windowed context
+ if (windowedContext != null) {
+ windowedContext.output(tag, output);
+ } else if (finishBundleContext != null) {
+ if (timestamp == null) {
+ throw new IllegalStateException(
+ "SolaceIO.Write.UnboundedSolaceWriter.Context: Timestamp is required for a"
+ + " FinishBundleContext.");
+ }
+ if (window == null) {
+ throw new IllegalStateException(
+ "SolaceIO.Write.UnboundedSolaceWriter.Context: BoundedWindow is required for a"
+ + " FinishBundleContext.");
+ }
+ finishBundleContext.output(tag, output, timestamp, window);
+ } else {
+ throw new IllegalStateException(
+ "SolaceIO.Write.UnboundedSolaceWriter.Context: No context provided");
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java
new file mode 100644
index 000000000000..6d6d0b27e2bb
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.SolaceIO;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This DoFn is the responsible for writing to Solace in streaming mode (one message at a time, not
+ * holding up any message), and emit the corresponding output (success or fail; only for persistent
+ * messages), so the SolaceIO.Write connector can be composed with other subsequent transforms in
+ * the pipeline.
+ *
+ * The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be
+ * reused across different threads (if the number of threads is higher than the number of sessions,
+ * which is probably the most common case).
+ *
+ *
The producer uses the JCSMP streaming mode to publish a single message at a time, processing
+ * the acks from this publication, and returning them as output of the DoFn.
+ *
+ *
There are no acks if the delivery mode is set to DIRECT.
+ *
+ *
This writer DoFn offers lower latency and lower throughput than {@link
+ * UnboundedBatchedSolaceWriter}.
+ */
+@Internal
+public final class UnboundedStreamingSolaceWriter extends UnboundedSolaceWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedStreamingSolaceWriter.class);
+
+ private final Counter sentToBroker =
+ Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_sent_to_broker");
+
+ private final Counter rejectedByBroker =
+ Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_rejected_by_broker");
+
+ // We use a state variable to force a shuffling and ensure the cardinality of the processing
+ @SuppressWarnings("UnusedVariable")
+ @StateId("current_key")
+ private final StateSpec> currentKeySpec = StateSpecs.value();
+
+ public UnboundedStreamingSolaceWriter(
+ SerializableFunction destinationFn,
+ SessionServiceFactory sessionServiceFactory,
+ DeliveryMode deliveryMode,
+ SolaceIO.SubmissionMode submissionMode,
+ int producersMapCardinality,
+ boolean publishLatencyMetrics) {
+ super(
+ destinationFn,
+ sessionServiceFactory,
+ deliveryMode,
+ submissionMode,
+ producersMapCardinality,
+ publishLatencyMetrics);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV element,
+ @Timestamp Instant timestamp,
+ @AlwaysFetched @StateId("current_key") ValueState currentKeyState) {
+
+ setCurrentBundleTimestamp(timestamp);
+
+ Integer currentKey = currentKeyState.read();
+ Integer elementKey = element.getKey();
+ Solace.Record record = element.getValue();
+
+ if (currentKey == null || !currentKey.equals(elementKey)) {
+ currentKeyState.write(elementKey);
+ }
+
+ if (record == null) {
+ LOG.error("SolaceIO.Write: Found null record with key {}. Ignoring record.", elementKey);
+ return;
+ }
+
+ // The publish method will retry, let's send a failure message if all the retries fail
+ try {
+ solaceSessionServiceWithProducer()
+ .getInitializedProducer(getSubmissionMode())
+ .publishSingleMessage(
+ record,
+ getDestinationFn().apply(record),
+ shouldPublishLatencyMetrics(),
+ getDeliveryMode());
+ sentToBroker.inc();
+ } catch (Exception e) {
+ rejectedByBroker.inc();
+ Solace.PublishResult errorPublish =
+ Solace.PublishResult.builder()
+ .setPublished(false)
+ .setMessageId(record.getMessageId())
+ .setError(
+ String.format(
+ "Message could not be published after several" + " retries. Error: %s",
+ e.getMessage()))
+ .setLatencyNanos(System.nanoTime())
+ .build();
+ solaceSessionServiceWithProducer().getPublishedResultsQueue().add(errorPublish);
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context) {
+ publishResults(BeamContextWrapper.of(context));
+ }
+}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
index ec0ae7194686..38b4953a5984 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
@@ -17,14 +17,24 @@
*/
package org.apache.beam.sdk.io.solace;
+import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.JCSMPProperties;
+import java.util.Queue;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.broker.MessageProducer;
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
-public class MockEmptySessionService extends SessionService {
+@AutoValue
+public abstract class MockEmptySessionService extends SessionService {
String exceptionMessage = "This is an empty client, use a MockSessionService instead.";
+ public static MockEmptySessionService create() {
+ return new AutoValue_MockEmptySessionService();
+ }
+
@Override
public void close() {
throw new UnsupportedOperationException(exceptionMessage);
@@ -36,7 +46,17 @@ public boolean isClosed() {
}
@Override
- public MessageReceiver createReceiver() {
+ public MessageReceiver getReceiver() {
+ throw new UnsupportedOperationException(exceptionMessage);
+ }
+
+ @Override
+ public MessageProducer getInitializedProducer(SubmissionMode mode) {
+ throw new UnsupportedOperationException(exceptionMessage);
+ }
+
+ @Override
+ public Queue getPublishedResultsQueue() {
throw new UnsupportedOperationException(exceptionMessage);
}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockProducer.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockProducer.java
new file mode 100644
index 000000000000..271310359577
--- /dev/null
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockProducer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPException;
+import java.time.Instant;
+import java.util.List;
+import org.apache.beam.sdk.io.solace.broker.MessageProducer;
+import org.apache.beam.sdk.io.solace.broker.PublishResultHandler;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.Record;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+public abstract class MockProducer implements MessageProducer {
+ final PublishResultHandler handler;
+
+ public MockProducer(PublishResultHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public int publishBatch(
+ List records,
+ boolean useCorrelationKeyLatency,
+ SerializableFunction destinationFn,
+ DeliveryMode deliveryMode) {
+ for (Record record : records) {
+ this.publishSingleMessage(
+ record, destinationFn.apply(record), useCorrelationKeyLatency, deliveryMode);
+ }
+ return records.size();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public void close() {}
+
+ public static class MockSuccessProducer extends MockProducer {
+ public MockSuccessProducer(PublishResultHandler handler) {
+ super(handler);
+ }
+
+ @Override
+ public void publishSingleMessage(
+ Record msg,
+ Destination topicOrQueue,
+ boolean useCorrelationKeyLatency,
+ DeliveryMode deliveryMode) {
+ if (useCorrelationKeyLatency) {
+ handler.responseReceivedEx(
+ Solace.PublishResult.builder()
+ .setPublished(true)
+ .setMessageId(msg.getMessageId())
+ .build());
+ } else {
+ handler.responseReceivedEx(msg.getMessageId());
+ }
+ }
+ }
+
+ public static class MockFailedProducer extends MockProducer {
+ public MockFailedProducer(PublishResultHandler handler) {
+ super(handler);
+ }
+
+ @Override
+ public void publishSingleMessage(
+ Record msg,
+ Destination topicOrQueue,
+ boolean useCorrelationKeyLatency,
+ DeliveryMode deliveryMode) {
+ if (useCorrelationKeyLatency) {
+ handler.handleErrorEx(
+ Solace.PublishResult.builder()
+ .setPublished(false)
+ .setMessageId(msg.getMessageId())
+ .setError("Some error")
+ .build(),
+ new JCSMPException("Some JCSMPException"),
+ Instant.now().toEpochMilli());
+ } else {
+ handler.handleErrorEx(
+ msg.getMessageId(),
+ new JCSMPException("Some JCSMPException"),
+ Instant.now().toEpochMilli());
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
index a4d6a42ef302..bd52dee7ea86 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
@@ -17,38 +17,63 @@
*/
package org.apache.beam.sdk.io.solace;
+import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPProperties;
import java.io.IOException;
-import java.io.Serializable;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.solace.MockProducer.MockSuccessProducer;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.broker.MessageProducer;
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
+import org.apache.beam.sdk.io.solace.broker.PublishResultHandler;
import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.checkerframework.checker.nullness.qual.Nullable;
-public class MockSessionService extends SessionService {
+@AutoValue
+public abstract class MockSessionService extends SessionService {
+ public static int ackWindowSizeForTesting = 87;
+ public static boolean callbackOnReactor = true;
- private final SerializableFunction getRecordFn;
- private MessageReceiver messageReceiver = null;
- private final int minMessagesReceived;
- private final @Nullable SubmissionMode mode;
-
- public MockSessionService(
- SerializableFunction getRecordFn,
- int minMessagesReceived,
- @Nullable SubmissionMode mode) {
- this.getRecordFn = getRecordFn;
- this.minMessagesReceived = minMessagesReceived;
- this.mode = mode;
+ public abstract @Nullable SerializableFunction recordFn();
+
+ public abstract int minMessagesReceived();
+
+ public abstract @Nullable SubmissionMode mode();
+
+ public abstract Function mockProducerFn();
+
+ private final Queue publishedResultsReceiver = new ConcurrentLinkedQueue<>();
+
+ public static Builder builder() {
+ return new AutoValue_MockSessionService.Builder()
+ .minMessagesReceived(0)
+ .mockProducerFn(MockSuccessProducer::new);
}
- public MockSessionService(
- SerializableFunction getRecordFn, int minMessagesReceived) {
- this(getRecordFn, minMessagesReceived, null);
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder recordFn(
+ @Nullable SerializableFunction recordFn);
+
+ public abstract Builder minMessagesReceived(int minMessagesReceived);
+
+ public abstract Builder mode(@Nullable SubmissionMode mode);
+
+ public abstract Builder mockProducerFn(
+ Function mockProducerFn);
+
+ public abstract MockSessionService build();
}
+ private MessageReceiver messageReceiver = null;
+ private MockProducer messageProducer = null;
+
@Override
public void close() {}
@@ -58,17 +83,41 @@ public boolean isClosed() {
}
@Override
- public MessageReceiver createReceiver() {
+ public MessageReceiver getReceiver() {
if (messageReceiver == null) {
- messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived);
+ messageReceiver = new MockReceiver(recordFn(), minMessagesReceived());
}
return messageReceiver;
}
+ @Override
+ public MessageProducer getInitializedProducer(SubmissionMode mode) {
+ if (messageProducer == null) {
+ messageProducer = mockProducerFn().apply(new PublishResultHandler(publishedResultsReceiver));
+ }
+ return messageProducer;
+ }
+
+ @Override
+ public Queue getPublishedResultsQueue() {
+ return publishedResultsReceiver;
+ }
+
@Override
public void connect() {}
- public static class MockReceiver implements MessageReceiver, Serializable {
+ @Override
+ public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
+ // Let's override some properties that will be overriden by the connector
+ // Opposite of the mode, to test that is overriden
+ baseProperties.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, callbackOnReactor);
+
+ baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, ackWindowSizeForTesting);
+
+ return baseProperties;
+ }
+
+ public static class MockReceiver implements MessageReceiver {
private final AtomicInteger counter = new AtomicInteger();
private final SerializableFunction getRecordFn;
private final int minMessagesReceived;
@@ -100,16 +149,4 @@ public boolean isEOF() {
return counter.get() >= minMessagesReceived;
}
}
-
- @Override
- public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
- // Let's override some properties that will be overriden by the connector
- // Opposite of the mode, to test that is overriden
- baseProperties.setProperty(
- JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, mode == SubmissionMode.HIGHER_THROUGHPUT);
-
- baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 87);
-
- return baseProperties;
- }
}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
index 603a30ad2c90..9c17ca604201 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java
@@ -17,22 +17,78 @@
*/
package org.apache.beam.sdk.io.solace;
+import com.google.auto.value.AutoValue;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import org.apache.beam.sdk.io.solace.MockProducer.MockFailedProducer;
+import org.apache.beam.sdk.io.solace.MockProducer.MockSuccessProducer;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.Nullable;
-public class MockSessionServiceFactory extends SessionServiceFactory {
- SessionService sessionService;
+@AutoValue
+public abstract class MockSessionServiceFactory extends SessionServiceFactory {
+ public abstract @Nullable SubmissionMode mode();
- public MockSessionServiceFactory(SessionService clientService) {
- this.sessionService = clientService;
+ public abstract @Nullable SerializableFunction recordFn();
+
+ public abstract int minMessagesReceived();
+
+ public abstract SessionServiceType sessionServiceType();
+
+ public static Builder builder() {
+ return new AutoValue_MockSessionServiceFactory.Builder()
+ .minMessagesReceived(0)
+ .sessionServiceType(SessionServiceType.WITH_SUCCEEDING_PRODUCER);
}
public static SessionServiceFactory getDefaultMock() {
- return new MockSessionServiceFactory(new MockEmptySessionService());
+ return MockSessionServiceFactory.builder().build();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder mode(@Nullable SubmissionMode mode);
+
+ public abstract Builder recordFn(
+ @Nullable SerializableFunction recordFn);
+
+ public abstract Builder minMessagesReceived(int minMessagesReceived);
+
+ public abstract Builder sessionServiceType(SessionServiceType sessionServiceType);
+
+ public abstract MockSessionServiceFactory build();
}
@Override
public SessionService create() {
- return sessionService;
+ switch (sessionServiceType()) {
+ case EMPTY:
+ return MockEmptySessionService.create();
+ case WITH_SUCCEEDING_PRODUCER:
+ return MockSessionService.builder()
+ .recordFn(recordFn())
+ .minMessagesReceived(minMessagesReceived())
+ .mode(mode())
+ .mockProducerFn(MockSuccessProducer::new)
+ .build();
+ case WITH_FAILING_PRODUCER:
+ return MockSessionService.builder()
+ .recordFn(recordFn())
+ .minMessagesReceived(minMessagesReceived())
+ .mode(mode())
+ .mockProducerFn(MockFailedProducer::new)
+ .build();
+ default:
+ throw new RuntimeException(
+ String.format("Unknown sessionServiceType: %s", sessionServiceType().name()));
+ }
+ }
+
+ public enum SessionServiceType {
+ EMPTY,
+ WITH_SUCCEEDING_PRODUCER,
+ WITH_FAILING_PRODUCER
}
}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
similarity index 72%
rename from sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
rename to sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
index cc1fa1d667aa..c718c55e1b48 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
@@ -31,10 +31,12 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.solace.MockSessionServiceFactory.SessionServiceType;
import org.apache.beam.sdk.io.solace.SolaceIO.Read;
import org.apache.beam.sdk.io.solace.SolaceIO.Read.Configuration;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
@@ -49,6 +51,7 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -61,7 +64,7 @@
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-public class SolaceIOTest {
+public class SolaceIOReadTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@@ -69,7 +72,6 @@ private Read getDefaultRead() {
return SolaceIO.read()
.from(Solace.Queue.fromName("queue"))
.withSempClientFactory(MockSempClientFactory.getDefaultMock())
- .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock())
.withMaxNumConnections(1);
}
@@ -77,7 +79,6 @@ private Read getDefaultReadForTopic() {
return SolaceIO.read()
.from(Solace.Topic.fromName("topic"))
.withSempClientFactory(MockSempClientFactory.getDefaultMock())
- .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock())
.withMaxNumConnections(1);
}
@@ -102,20 +103,18 @@ private static UnboundedSolaceSource getSource(Read spec, TestPi
@Test
public void testReadMessages() {
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages =
- ImmutableList.of(
- SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
- SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
- SolaceDataUtils.getBytesXmlMessage("payload_test2", "452"));
- return getOrNull(index, messages);
- },
- 3);
+ SerializableFunction recordFn =
+ index -> {
+ List messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2", "452"));
+ return getOrNull(index, messages);
+ };
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().minMessagesReceived(3).recordFn(recordFn).build();
// Expected data
List expected = new ArrayList<>();
@@ -137,20 +136,18 @@ public void testReadMessages() {
@Test
public void testReadMessagesWithDeduplication() {
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages =
- ImmutableList.of(
- SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
- SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
- SolaceDataUtils.getBytesXmlMessage("payload_test2", "451"));
- return getOrNull(index, messages);
- },
- 3);
+ SerializableFunction recordFn =
+ index -> {
+ List messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2", "451"));
+ return getOrNull(index, messages);
+ };
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build();
// Expected data
List expected = new ArrayList<>();
@@ -172,19 +169,18 @@ public void testReadMessagesWithDeduplication() {
@Test
public void testReadMessagesWithoutDeduplication() {
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages =
- ImmutableList.of(
- SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
- SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
- SolaceDataUtils.getBytesXmlMessage("payload_test2", "451"));
- return getOrNull(index, messages);
- },
- 3);
+ SerializableFunction recordFn =
+ index -> {
+ List messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2", "451"));
+ return getOrNull(index, messages);
+ };
+
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build();
// Expected data
List expected = new ArrayList<>();
@@ -206,32 +202,38 @@ public void testReadMessagesWithoutDeduplication() {
@Test
public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() {
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages =
- ImmutableList.of(
- SolaceDataUtils.getBytesXmlMessage(
- "payload_test0", null, null, new ReplicationGroupMessageIdImpl(2L, 1L)),
- SolaceDataUtils.getBytesXmlMessage(
- "payload_test1", null, null, new ReplicationGroupMessageIdImpl(2L, 2L)),
- SolaceDataUtils.getBytesXmlMessage(
- "payload_test2", null, null, new ReplicationGroupMessageIdImpl(2L, 2L)));
- return getOrNull(index, messages);
- },
- 3);
+
+ String id0 = UUID.randomUUID().toString();
+ String id1 = UUID.randomUUID().toString();
+ String id2 = UUID.randomUUID().toString();
+
+ SerializableFunction recordFn =
+ index -> {
+ List messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test0", id0, null, new ReplicationGroupMessageIdImpl(2L, 1L)),
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test1", id1, null, new ReplicationGroupMessageIdImpl(2L, 2L)),
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test2", id2, null, new ReplicationGroupMessageIdImpl(2L, 2L)));
+ return getOrNull(index, messages);
+ };
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build();
// Expected data
List expected = new ArrayList<>();
expected.add(
SolaceDataUtils.getSolaceRecord(
- "payload_test0", null, new ReplicationGroupMessageIdImpl(2L, 1L)));
+ "payload_test0", id0, new ReplicationGroupMessageIdImpl(2L, 1L)));
+ expected.add(
+ SolaceDataUtils.getSolaceRecord(
+ "payload_test1", id1, new ReplicationGroupMessageIdImpl(2L, 2L)));
expected.add(
SolaceDataUtils.getSolaceRecord(
- "payload_test1", null, new ReplicationGroupMessageIdImpl(2L, 2L)));
+ "payload_test2", id2, new ReplicationGroupMessageIdImpl(2L, 2L)));
// Run the pipeline
PCollection events =
@@ -248,19 +250,18 @@ public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() {
@Test
public void testReadWithCoderAndParseFnAndTimestampFn() {
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages =
- ImmutableList.of(
- SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
- SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
- SolaceDataUtils.getBytesXmlMessage("payload_test2", "452"));
- return getOrNull(index, messages);
- },
- 3);
+ SerializableFunction recordFn =
+ index -> {
+ List messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2", "452"));
+ return getOrNull(index, messages);
+ };
+
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build();
// Expected data
List expected = new ArrayList<>();
@@ -304,7 +305,10 @@ public void testSplitsForExclusiveQueue() throws Exception {
SolaceIO.read()
.from(Solace.Queue.fromName("queue"))
.withSempClientFactory(new MockSempClientFactory(mockSempClient))
- .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock());
+ .withSessionServiceFactory(
+ MockSessionServiceFactory.builder()
+ .sessionServiceType(SessionServiceType.EMPTY)
+ .build());
int desiredNumSplits = 5;
@@ -316,7 +320,10 @@ public void testSplitsForExclusiveQueue() throws Exception {
@Test
public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws Exception {
- Read spec = getDefaultRead().withMaxNumConnections(3);
+ Read spec =
+ getDefaultRead()
+ .withMaxNumConnections(3)
+ .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock());
int desiredNumSplits = 5;
@@ -328,7 +335,10 @@ public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws Excepti
@Test
public void testSplitsForNonExclusiveQueueWithMaxNumConnectionsRespectDesired() throws Exception {
- Read spec = getDefaultRead().withMaxNumConnections(10);
+ Read spec =
+ getDefaultRead()
+ .withMaxNumConnections(10)
+ .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock());
int desiredNumSplits = 5;
UnboundedSolaceSource initialSource = getSource(spec, pipeline);
@@ -346,7 +356,9 @@ public void testCreateQueueForTopic() {
.build();
Read spec =
- getDefaultReadForTopic().withSempClientFactory(new MockSempClientFactory(mockSempClient));
+ getDefaultReadForTopic()
+ .withSempClientFactory(new MockSempClientFactory(mockSempClient))
+ .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock());
spec.expand(PBegin.in(TestPipeline.create()));
// check if createQueueForTopic was executed
assertEquals(1, createQueueForTopicFnCounter.get());
@@ -358,22 +370,22 @@ public void testCheckpointMark() throws Exception {
AtomicInteger countAckMessages = new AtomicInteger(0);
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- messages.add(
- SolaceDataUtils.getBytesXmlMessage(
- "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
- }
- countConsumedMessages.incrementAndGet();
- return getOrNull(index, messages);
- },
- 10);
+
+ SerializableFunction recordFn =
+ index -> {
+ List messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ };
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
+
Read spec = getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory);
UnboundedSolaceSource initialSource = getSource(spec, pipeline);
@@ -407,21 +419,20 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
AtomicInteger countAckMessages = new AtomicInteger(0);
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- messages.add(
- SolaceDataUtils.getBytesXmlMessage(
- "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
- }
- countConsumedMessages.incrementAndGet();
- return getOrNull(index, messages);
- },
- 10);
+ SerializableFunction recordFn =
+ index -> {
+ List messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ };
+
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
Read spec =
getDefaultRead()
@@ -467,22 +478,21 @@ public void testCheckpointMarkSafety() throws Exception {
AtomicInteger countAckMessages = new AtomicInteger(0);
// Broker that creates input data
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages = new ArrayList<>();
- for (int i = 0; i < messagesToProcess; i++) {
- messages.add(
- SolaceDataUtils.getBytesXmlMessage(
- "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
- }
- countConsumedMessages.incrementAndGet();
- return getOrNull(index, messages);
- },
- 10);
+ SerializableFunction recordFn =
+ index -> {
+ List messages = new ArrayList<>();
+ for (int i = 0; i < messagesToProcess; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ };
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
+
Read spec =
getDefaultRead()
.withSessionServiceFactory(fakeSessionServiceFactory)
@@ -558,20 +568,18 @@ public void testDestinationTopicQueueCreation() {
@Test
public void testTopicEncoding() {
- MockSessionService mockClientService =
- new MockSessionService(
- index -> {
- List messages =
- ImmutableList.of(
- SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
- SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
- SolaceDataUtils.getBytesXmlMessage("payload_test2", "452"));
- return getOrNull(index, messages);
- },
- 3);
+ SerializableFunction recordFn =
+ index -> {
+ List messages =
+ ImmutableList.of(
+ SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"),
+ SolaceDataUtils.getBytesXmlMessage("payload_test2", "452"));
+ return getOrNull(index, messages);
+ };
SessionServiceFactory fakeSessionServiceFactory =
- new MockSessionServiceFactory(mockClientService);
+ MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build();
// Run
PCollection events =
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java
new file mode 100644
index 000000000000..e92657c3c3d2
--- /dev/null
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+
+import com.solacesystems.jcsmp.DeliveryMode;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.io.solace.MockSessionServiceFactory.SessionServiceType;
+import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
+import org.apache.beam.sdk.io.solace.SolaceIO.WriterType;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.Record;
+import org.apache.beam.sdk.io.solace.data.SolaceDataUtils;
+import org.apache.beam.sdk.io.solace.write.SolaceOutput;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SolaceIOWriteTest {
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private final List keys = ImmutableList.of("450", "451", "452");
+ private final List payloads = ImmutableList.of("payload0", "payload1", "payload2");
+
+ private PCollection getRecords(Pipeline p) {
+ TestStream.Builder> kvBuilder =
+ TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class)))
+ .advanceWatermarkTo(Instant.EPOCH);
+
+ assert keys.size() == payloads.size();
+
+ for (int k = 0; k < keys.size(); k++) {
+ kvBuilder =
+ kvBuilder
+ .addElements(KV.of(keys.get(k), payloads.get(k)))
+ .advanceProcessingTime(Duration.standardSeconds(60));
+ }
+
+ TestStream> testStream = kvBuilder.advanceWatermarkToInfinity();
+ PCollection> kvs = p.apply("Test stream", testStream);
+
+ return kvs.apply(
+ "To Record",
+ MapElements.into(TypeDescriptor.of(Record.class))
+ .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey())));
+ }
+
+ private SolaceOutput getWriteTransform(
+ SubmissionMode mode,
+ WriterType writerType,
+ Pipeline p,
+ ErrorHandler errorHandler) {
+ SessionServiceFactory fakeSessionServiceFactory =
+ MockSessionServiceFactory.builder().mode(mode).build();
+
+ PCollection records = getRecords(p);
+ return records.apply(
+ "Write to Solace",
+ SolaceIO.write()
+ .to(Solace.Queue.fromName("queue"))
+ .withSubmissionMode(mode)
+ .withWriterType(writerType)
+ .withDeliveryMode(DeliveryMode.PERSISTENT)
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withErrorHandler(errorHandler));
+ }
+
+ private static PCollection getIdsPCollection(SolaceOutput output) {
+ return output
+ .getSuccessfulPublish()
+ .apply(
+ "Get message ids", MapElements.into(strings()).via(Solace.PublishResult::getMessageId));
+ }
+
+ @Test
+ public void testWriteLatencyStreaming() throws Exception {
+ SubmissionMode mode = SubmissionMode.LOWER_LATENCY;
+ WriterType writerType = WriterType.STREAMING;
+
+ ErrorHandler> errorHandler =
+ pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform());
+ SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler);
+ PCollection ids = getIdsPCollection(output);
+
+ PAssert.that(ids).containsInAnyOrder(keys);
+ errorHandler.close();
+ PAssert.that(errorHandler.getOutput()).empty();
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testWriteThroughputStreaming() throws Exception {
+ SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT;
+ WriterType writerType = WriterType.STREAMING;
+ ErrorHandler> errorHandler =
+ pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform());
+ SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler);
+ PCollection ids = getIdsPCollection(output);
+
+ PAssert.that(ids).containsInAnyOrder(keys);
+ errorHandler.close();
+ PAssert.that(errorHandler.getOutput()).empty();
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testWriteLatencyBatched() throws Exception {
+ SubmissionMode mode = SubmissionMode.LOWER_LATENCY;
+ WriterType writerType = WriterType.BATCHED;
+ ErrorHandler> errorHandler =
+ pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform());
+ SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler);
+ PCollection ids = getIdsPCollection(output);
+
+ PAssert.that(ids).containsInAnyOrder(keys);
+ errorHandler.close();
+ PAssert.that(errorHandler.getOutput()).empty();
+ pipeline.run();
+ }
+
+ @Test
+ public void testWriteThroughputBatched() throws Exception {
+ SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT;
+ WriterType writerType = WriterType.BATCHED;
+ ErrorHandler> errorHandler =
+ pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform());
+ SolaceOutput output = getWriteTransform(mode, writerType, pipeline, errorHandler);
+ PCollection ids = getIdsPCollection(output);
+
+ PAssert.that(ids).containsInAnyOrder(keys);
+ errorHandler.close();
+ PAssert.that(errorHandler.getOutput()).empty();
+ pipeline.run();
+ }
+
+ @Test
+ public void testWriteWithFailedRecords() throws Exception {
+ SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT;
+ WriterType writerType = WriterType.BATCHED;
+ ErrorHandler> errorHandler =
+ pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform());
+
+ SessionServiceFactory fakeSessionServiceFactory =
+ MockSessionServiceFactory.builder()
+ .mode(mode)
+ .sessionServiceType(SessionServiceType.WITH_FAILING_PRODUCER)
+ .build();
+
+ PCollection records = getRecords(pipeline);
+ SolaceOutput output =
+ records.apply(
+ "Write to Solace",
+ SolaceIO.write()
+ .to(Solace.Queue.fromName("queue"))
+ .withSubmissionMode(mode)
+ .withWriterType(writerType)
+ .withDeliveryMode(DeliveryMode.PERSISTENT)
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withErrorHandler(errorHandler));
+
+ PCollection ids = getIdsPCollection(output);
+
+ PAssert.that(ids).empty();
+ errorHandler.close();
+ PAssert.thatSingleton(Objects.requireNonNull(errorHandler.getOutput()))
+ .isEqualTo((long) payloads.size());
+ pipeline.run();
+ }
+}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
index 0c6f88a7c9d5..357734f18aad 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java
@@ -31,9 +31,8 @@ public class OverrideWriterPropertiesTest {
@Test
public void testOverrideForHigherThroughput() {
SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT;
- MockSessionService service = new MockSessionService(null, 0, mode);
+ MockSessionService service = MockSessionService.builder().mode(mode).build();
- // Test HIGHER_THROUGHPUT mode
JCSMPProperties props = service.initializeWriteSessionProperties(mode);
assertEquals(false, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
assertEquals(
@@ -44,13 +43,26 @@ public void testOverrideForHigherThroughput() {
@Test
public void testOverrideForLowerLatency() {
SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY;
- MockSessionService service = new MockSessionService(null, 0, mode);
+ MockSessionService service = MockSessionService.builder().mode(mode).build();
- // Test HIGHER_THROUGHPUT mode
JCSMPProperties props = service.initializeWriteSessionProperties(mode);
assertEquals(true, props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
assertEquals(
Long.valueOf(50),
Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
}
+
+ @Test
+ public void testDontOverrideForCustom() {
+ SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.CUSTOM;
+ MockSessionService service = MockSessionService.builder().mode(mode).build();
+
+ JCSMPProperties props = service.initializeWriteSessionProperties(mode);
+ assertEquals(
+ MockSessionService.callbackOnReactor,
+ props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR));
+ assertEquals(
+ Long.valueOf(MockSessionService.ackWindowSizeForTesting),
+ Long.valueOf(props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE)));
+ }
}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
index 5134bd131d73..9e04c4cfd276 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
@@ -100,7 +100,7 @@ public static Solace.Record getSolaceRecord(
: DEFAULT_REPLICATION_GROUP_ID.toString();
return Solace.Record.builder()
- .setPayload(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)))
+ .setPayload(payload.getBytes(StandardCharsets.UTF_8))
.setMessageId(messageId)
.setDestination(
Solace.Destination.builder()
@@ -116,7 +116,7 @@ public static Solace.Record getSolaceRecord(
.setTimeToLive(1000L)
.setSenderTimestamp(null)
.setReplicationGroupMessageId(replicationGroupMessageIdString)
- .setAttachmentBytes(ByteBuffer.wrap(new byte[0]))
+ .setAttachmentBytes(new byte[0])
.build();
}
diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java
index 1a2a056efd45..ee5d206533dc 100644
--- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java
+++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java
@@ -17,49 +17,71 @@
*/
package org.apache.beam.sdk.io.solace.it;
+import static org.apache.beam.sdk.io.solace.it.SolaceContainerManager.TOPIC_NAME;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import static org.junit.Assert.assertEquals;
+import com.solacesystems.jcsmp.DeliveryMode;
import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory;
import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.io.solace.data.Solace.Queue;
+import org.apache.beam.sdk.io.solace.data.SolaceDataUtils;
+import org.apache.beam.sdk.io.solace.write.SolaceOutput;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.runners.MethodSorters;
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class SolaceIOIT {
private static final String NAMESPACE = SolaceIOIT.class.getName();
private static final String READ_COUNT = "read_count";
+ private static final String WRITE_COUNT = "write_count";
private static SolaceContainerManager solaceContainerManager;
- private static final TestPipelineOptions readPipelineOptions;
+ private static final String queueName = "test_queue";
+ private static final TestPipelineOptions pipelineOptions;
+ private static final long PUBLISH_MESSAGE_COUNT = 20;
static {
- readPipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class);
- readPipelineOptions.setBlockOnRun(false);
- readPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
- readPipelineOptions.as(StreamingOptions.class).setStreaming(false);
+ pipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class);
+ pipelineOptions.as(StreamingOptions.class).setStreaming(true);
+ // For the read connector tests, we need to make sure that p.run() does not block
+ pipelineOptions.setBlockOnRun(false);
+ pipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
}
- @Rule public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions);
+ @Rule public final TestPipeline pipeline = TestPipeline.fromOptions(pipelineOptions);
@BeforeClass
public static void setup() throws IOException {
solaceContainerManager = new SolaceContainerManager();
solaceContainerManager.start();
+ solaceContainerManager.createQueueWithSubscriptionTopic(queueName);
}
@AfterClass
@@ -69,20 +91,17 @@ public static void afterClass() {
}
}
+ // The order of the following tests matter. The first test publishes some messages in a Solace
+ // queue, and those messages are read by the second test. If another writer tests is run before
+ // the read test, that will alter the count for the read test and will make it fail.
@Test
- public void testRead() {
- String queueName = "test_queue";
- solaceContainerManager.createQueueWithSubscriptionTopic(queueName);
-
- // todo this is very slow, needs to be replaced with the SolaceIO.write connector.
- int publishMessagesCount = 20;
- for (int i = 0; i < publishMessagesCount; i++) {
- solaceContainerManager.sendToTopic(
- "{\"field_str\":\"value\",\"field_int\":123}",
- ImmutableList.of("Solace-Message-ID:m" + i));
- }
+ public void test01WriteStreaming() {
+ testWriteConnector(SolaceIO.WriterType.STREAMING);
+ }
- readPipeline
+ @Test
+ public void test02Read() {
+ pipeline
.apply(
"Read from Solace",
SolaceIO.read()
@@ -105,12 +124,83 @@ public void testRead() {
.build()))
.apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT)));
- PipelineResult pipelineResult = readPipeline.run();
+ PipelineResult pipelineResult = pipeline.run();
+ // We need enough time for Beam to pull all messages from the queue, but we need a timeout too,
+ // as the Read connector will keep attempting to read forever.
pipelineResult.waitUntilFinish(Duration.standardSeconds(15));
MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE);
long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT);
- assertEquals(publishMessagesCount, actualRecordsCount);
+ assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount);
+ }
+
+ @Test
+ public void test03WriteBatched() {
+ testWriteConnector(SolaceIO.WriterType.BATCHED);
+ }
+
+ private void testWriteConnector(SolaceIO.WriterType writerType) {
+ Pipeline p = createWriterPipeline(writerType);
+
+ PipelineResult pipelineResult = p.run();
+ pipelineResult.waitUntilFinish();
+ MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE);
+ long actualRecordsCount = metricsReader.getCounterMetric(WRITE_COUNT);
+ assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount);
+ }
+
+ private Pipeline createWriterPipeline(SolaceIO.WriterType writerType) {
+ TestStream.Builder> kvBuilder =
+ TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class)))
+ .advanceWatermarkTo(Instant.EPOCH);
+
+ for (int i = 0; i < PUBLISH_MESSAGE_COUNT; i++) {
+ String key = "Solace-Message-ID:m" + i;
+ String payload = String.format("{\"field_str\":\"value\",\"field_int\":123%d}", i);
+ kvBuilder =
+ kvBuilder
+ .addElements(KV.of(key, payload))
+ .advanceProcessingTime(Duration.standardSeconds(60));
+ }
+
+ TestStream> testStream = kvBuilder.advanceWatermarkToInfinity();
+
+ PCollection> kvs =
+ pipeline.apply(String.format("Test stream %s", writerType), testStream);
+
+ PCollection records =
+ kvs.apply(
+ String.format("To Record %s", writerType),
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey())));
+
+ SolaceOutput result =
+ records.apply(
+ String.format("Write to Solace %s", writerType),
+ SolaceIO.write()
+ .to(Solace.Topic.fromName(TOPIC_NAME))
+ .withSubmissionMode(SolaceIO.SubmissionMode.TESTING)
+ .withWriterType(writerType)
+ .withDeliveryMode(DeliveryMode.PERSISTENT)
+ .withNumberOfClientsPerWorker(1)
+ .withNumShards(1)
+ .withSessionServiceFactory(
+ BasicAuthJcsmpSessionServiceFactory.builder()
+ .host("localhost:" + solaceContainerManager.jcsmpPortMapped)
+ .username(SolaceContainerManager.USERNAME)
+ .password(SolaceContainerManager.PASSWORD)
+ .vpnName(SolaceContainerManager.VPN_NAME)
+ .build()));
+ result
+ .getSuccessfulPublish()
+ .apply(
+ String.format("Get ids %s", writerType),
+ MapElements.into(strings()).via(Solace.PublishResult::getMessageId))
+ .apply(
+ String.format("Count %s", writerType),
+ ParDo.of(new CountingFn<>(NAMESPACE, WRITE_COUNT)));
+
+ return pipeline;
}
private static class CountingFn extends DoFn {