Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
dba27f5
This is a follow-up PR to #31953, and part of the issue #31905.
iht Jul 26, 2024
dc044b9
Use static imports for Preconditions
iht Sep 13, 2024
0bd77d0
Remove unused method
iht Sep 13, 2024
5249d9e
Logging has builtin formatting support
iht Sep 13, 2024
4681a07
Use TypeDescriptors to check the type used as input
iht Sep 13, 2024
99d9993
Fix parameter name
iht Sep 13, 2024
048de8a
Use interface + utils class for MessageProducer
iht Sep 13, 2024
bbeaff4
Use null instead of optional
iht Sep 13, 2024
3ab4d0b
Avoid using ByteString just to create an empty byte array.
iht Sep 13, 2024
cd01cff
Fix documentation, we are not using ByteString now.
iht Sep 13, 2024
f6d1f1e
Not needed anymore, we are not using ByteString
iht Sep 13, 2024
ce1327b
Defer transforming latency from nanos to millis.
iht Sep 13, 2024
af28bde
Avoid using top level classes with a single inner class.
iht Sep 14, 2024
e5521ae
Remove using a state variable, there is already a timer.
iht Sep 14, 2024
19ade8a
Properties must always be set.
iht Sep 15, 2024
26c9d48
Add a new custom mode so no JCSMP property is overridden.
iht Sep 15, 2024
997fe56
Add some more documentation about the new custom submission mode.
iht Sep 15, 2024
251f14d
Fix bug introduced with the refactoring of code for this PR.
iht Sep 15, 2024
a7a5eff
Remove unnecessary Serializable annotation.
iht Sep 16, 2024
29ed200
Make the PublishResult class for handling callbacks non-static to han…
bzablocki Nov 4, 2024
ff5f09a
Rename maxNumOfUsedWorkers to numShards
bzablocki Nov 4, 2024
33d1771
Use RoundRobin assignment of producers to process bundles.
bzablocki Nov 4, 2024
fe8b3ca
Output results in a GlobalWindow
bzablocki Nov 5, 2024
ad5c45b
Add ErrorHandler
bzablocki Nov 8, 2024
6a8b9ce
Fix docs
bzablocki Nov 8, 2024
44059c8
Remove PublishResultHandler class that was just a wrapper around a Queue
bzablocki Nov 12, 2024
602e85b
small refactors
bzablocki Nov 12, 2024
f31a320
Revert CsvIO docs fix
bzablocki Nov 12, 2024
4dddd33
Add withErrorHandler docs
bzablocki Nov 12, 2024
4f9c024
fix var scope
bzablocki Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
* [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879))
* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)).

## New Features / Improvements

Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testRuntimeOnly library.java.slf4j_jdk14
testImplementation library.java.testcontainers_solace
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.FlowReceiver;
Expand All @@ -28,9 +29,15 @@
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.XMLMessageProducer;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/**
Expand All @@ -39,34 +46,50 @@
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue. The
* connection is established using basic authentication.
*/
public class BasicAuthJcsmpSessionService extends SessionService {
private final String queueName;
private final String host;
private final String username;
private final String password;
private final String vpnName;
@Nullable private JCSMPSession jcsmpSession;
@Nullable private MessageReceiver messageReceiver;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
@AutoValue
public abstract class BasicAuthJcsmpSessionService extends SessionService {

/** The name of the queue to receive messages from. */
public abstract @Nullable String queueName();

/** The host name or IP address of the Solace broker. Format: Host[:Port] */
public abstract String host();

/** The username to use for authentication. */
public abstract String username();

/** The password to use for authentication. */
public abstract String password();

/** The name of the VPN to connect to. */
public abstract String vpnName();

public static Builder builder() {
return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME);
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder queueName(@Nullable String queueName);

public abstract Builder host(String host);

/**
* Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters.
*
* @param queueName The name of the queue to receive messages from.
* @param host The host name or IP address of the Solace broker. Format: Host[:Port]
* @param username The username to use for authentication.
* @param password The password to use for authentication.
* @param vpnName The name of the VPN to connect to.
*/
public BasicAuthJcsmpSessionService(
String queueName, String host, String username, String password, String vpnName) {
this.queueName = queueName;
this.host = host;
this.username = username;
this.password = password;
this.vpnName = vpnName;
public abstract Builder username(String username);

public abstract Builder password(String password);

public abstract Builder vpnName(String vpnName);

public abstract BasicAuthJcsmpSessionService build();
}

@Nullable private transient JCSMPSession jcsmpSession;
@Nullable private transient MessageReceiver messageReceiver;
@Nullable private transient MessageProducer messageProducer;
private final java.util.Queue<PublishResult> publishedResultsQueue =
new ConcurrentLinkedQueue<>();
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

@Override
public void connect() {
retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
Expand All @@ -79,6 +102,9 @@ public void close() {
if (messageReceiver != null) {
messageReceiver.close();
}
if (messageProducer != null) {
messageProducer.close();
}
if (!isClosed()) {
checkStateNotNull(jcsmpSession).closeSession();
}
Expand All @@ -88,24 +114,64 @@ public void close() {
}

@Override
public MessageReceiver createReceiver() {
this.messageReceiver =
retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
public MessageReceiver getReceiver() {
if (this.messageReceiver == null) {
this.messageReceiver =
retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
}
return this.messageReceiver;
}

@Override
public MessageProducer getInitializedProducer(SubmissionMode submissionMode) {
if (this.messageProducer == null || this.messageProducer.isClosed()) {
Callable<MessageProducer> create = () -> createXMLMessageProducer(submissionMode);
this.messageProducer =
retryCallableManager.retryCallable(create, ImmutableSet.of(JCSMPException.class));
}
return checkStateNotNull(this.messageProducer);
}

@Override
public java.util.Queue<PublishResult> getPublishedResultsQueue() {
return publishedResultsQueue;
}

@Override
public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}

private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
throws JCSMPException, IOException {

if (isClosed()) {
connectWriteSession(submissionMode);
}

@SuppressWarnings("nullness")
Callable<XMLMessageProducer> initProducer =
() ->
Objects.requireNonNull(jcsmpSession)
.getMessageProducer(new PublishResultHandler(publishedResultsQueue));

XMLMessageProducer producer =
retryCallableManager.retryCallable(initProducer, ImmutableSet.of(JCSMPException.class));
if (producer == null) {
throw new IOException("SolaceIO.Write: Could not create producer, producer object is null");
}
return new SolaceMessageProducer(producer);
}

private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
}

Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);
Queue queue =
JCSMPFactory.onlyInstance()
.createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set."));

ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
Expand All @@ -118,7 +184,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException
createFlowReceiver(jcsmpSession, flowProperties, endpointProperties));
}
throw new IOException(
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null.");
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is"
+ " null.");
}

// The `@SuppressWarning` is needed here, because the checkerframework reports an error for the
Expand All @@ -141,20 +208,33 @@ private int connectSession() throws JCSMPException {
return 0;
}

private int connectWriteSession(SubmissionMode mode) throws JCSMPException {
if (jcsmpSession == null) {
jcsmpSession = createWriteSessionObject(mode);
}
jcsmpSession.connect();
return 0;
}

private JCSMPSession createSessionObject() throws InvalidPropertiesException {
JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties());
return JCSMPFactory.onlyInstance().createSession(properties);
}

private JCSMPSession createWriteSessionObject(SubmissionMode mode)
throws InvalidPropertiesException {
return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode));
}

@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName);
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());

baseProps.setProperty(
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
baseProps.setProperty(JCSMPProperties.USERNAME, username);
baseProps.setProperty(JCSMPProperties.PASSWORD, password);
baseProps.setProperty(JCSMPProperties.HOST, host);
baseProps.setProperty(JCSMPProperties.USERNAME, username());
baseProps.setProperty(JCSMPProperties.PASSWORD, password());
baseProps.setProperty(JCSMPProperties.HOST, host());
return baseProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.solace.broker;

import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;

Expand All @@ -31,12 +30,16 @@
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
/** The host name or IP address of the Solace broker. Format: Host[:Port] */
public abstract String host();

/** The username to use for authentication. */
public abstract String username();

/** The password to use for authentication. */
public abstract String password();

/** The name of the VPN to connect to. */
public abstract String vpnName();

public static Builder builder() {
Expand All @@ -54,6 +57,7 @@ public abstract static class Builder {

/** Set Solace username. */
public abstract Builder username(String username);

/** Set Solace password. */
public abstract Builder password(String password);

Expand All @@ -65,11 +69,15 @@ public abstract static class Builder {

@Override
public SessionService create() {
return new BasicAuthJcsmpSessionService(
checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
host(),
username(),
password(),
vpnName());
BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder();
if (queue != null) {
builder = builder.queueName(queue.getName());
}
Comment on lines +73 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change from throwing to checking?
Also, given that AutoValue implementations are immutable, deriving from a class that allows you to mutate the queue name seems to run counter to what the framework provides. SessionServiceFactory should probably be an interface declaring create and optionally define an abstract class that implements the interface (abstract of course) and declares equals and hashCode abstract if your intention is to have all implementors explicitly override those methods for value equality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was initially used for the Read connector alone, which requires that queue property to be set, and should fail if it is not set.

The Write connector does not need a pre-existing queue. I changed the queueName parameter to @Nullable and this to a check since now the queueName value is optional.

Initially, there were different classes for SessionService-ish for both connectors, with different properties. Now we use a single class, for consistency across the Read and Write connectors.

return builder
.host(host())
.username(username())
.password(password())
.vpnName(vpnName())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public abstract static class Builder {

@Override
public SessionService create() {
String password = null;
String password;
try {
password = retrieveSecret();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.Destination;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.transforms.SerializableFunction;

/**
* Base class for publishing messages to a Solace broker.
*
* <p>Implementations of this interface are responsible for managing the connection to the broker
* and for publishing messages to the broker.
*/
@Internal
public interface MessageProducer {

/** Publishes a message to the broker. */
void publishSingleMessage(
Solace.Record msg,
Destination topicOrQueue,
boolean useCorrelationKeyLatency,
DeliveryMode deliveryMode);

/**
* Publishes a batch of messages to the broker.
*
* <p>The size of the batch cannot exceed 50 messages, this is a limitation of the Solace API.
*
* <p>It returns the number of messages written.
*/
int publishBatch(
List<Solace.Record> records,
boolean useCorrelationKeyLatency,
SerializableFunction<Solace.Record, Destination> destinationFn,
DeliveryMode deliveryMode);

/** Returns {@literal true} if the message producer is closed, {@literal false} otherwise. */
boolean isClosed();

/** Closes the message producer. */
void close();
}
Loading
Loading