Skip to content

Commit f17bb8d

Browse files
authored
SolaceIO: separate auth and session settings (#32406)
* Refactored to separate authentication and session settings, and allow inheritance and overriding of SessionService * Improve methods' javadoc
1 parent 1d5a3e8 commit f17bb8d

File tree

10 files changed

+345
-110
lines changed

10 files changed

+345
-110
lines changed

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@
209209
*
210210
* <p>See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for session authentication.
211211
* The connector provides implementation of the {@link SessionServiceFactory} using the Basic
212-
* Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}.
212+
* Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory}.
213213
*
214214
* <p>For the authentication to the SEMP API ({@link Read#withSempClientFactory(SempClientFactory)})
215215
* the connector provides {@link org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to
@@ -639,9 +639,8 @@ public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
639639
* <li>create a {@link org.apache.beam.sdk.io.solace.broker.MessageReceiver}.
640640
* </ul>
641641
*
642-
* <p>An existing implementation of the SempClientFactory includes {@link
643-
* org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic
644-
* Authentication to Solace. *
642+
* <p>The {@link BasicAuthJcsmpSessionServiceFactory} is an existing implementation of the
643+
* {@link SessionServiceFactory} which implements the Basic Authentication to Solace.
645644
*
646645
* <p>To use it, specify the credentials with the builder methods. *
647646
*

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@
2020
import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;
2121

2222
import com.google.auto.value.AutoValue;
23+
import com.solacesystems.jcsmp.JCSMPProperties;
2324

2425
/**
25-
* A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link
26+
* A factory for creating {@link JcsmpSessionService} instances. Extends {@link
2627
* SessionServiceFactory}.
2728
*
28-
* <p>This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with
29-
* authenticate to Solace with Basic Authentication.
29+
* <p>This factory provides a way to create {@link JcsmpSessionService} that use Basic
30+
* Authentication.
3031
*/
3132
@AutoValue
3233
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
@@ -69,15 +70,14 @@ public abstract static class Builder {
6970

7071
@Override
7172
public SessionService create() {
72-
BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder();
73-
if (queue != null) {
74-
builder = builder.queueName(queue.getName());
75-
}
76-
return builder
77-
.host(host())
78-
.username(username())
79-
.password(password())
80-
.vpnName(vpnName())
81-
.build();
73+
JCSMPProperties jcsmpProperties = new JCSMPProperties();
74+
jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, vpnName());
75+
jcsmpProperties.setProperty(
76+
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
77+
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, username());
78+
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, password());
79+
jcsmpProperties.setProperty(JCSMPProperties.HOST, host());
80+
81+
return JcsmpSessionService.create(jcsmpProperties, getQueue());
8282
}
8383
}
Lines changed: 22 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -43,45 +43,16 @@
4343
/**
4444
* A class that manages a connection to a Solace broker using basic authentication.
4545
*
46-
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue. The
47-
* connection is established using basic authentication.
46+
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue.
4847
*/
4948
@AutoValue
50-
public abstract class BasicAuthJcsmpSessionService extends SessionService {
49+
public abstract class JcsmpSessionService extends SessionService {
5150

52-
/** The name of the queue to receive messages from. */
53-
public abstract @Nullable String queueName();
51+
/** JCSMP properties used to establish the connection. */
52+
abstract JCSMPProperties jcsmpProperties();
5453

55-
/** The host name or IP address of the Solace broker. Format: Host[:Port] */
56-
public abstract String host();
57-
58-
/** The username to use for authentication. */
59-
public abstract String username();
60-
61-
/** The password to use for authentication. */
62-
public abstract String password();
63-
64-
/** The name of the VPN to connect to. */
65-
public abstract String vpnName();
66-
67-
public static Builder builder() {
68-
return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME);
69-
}
70-
71-
@AutoValue.Builder
72-
public abstract static class Builder {
73-
public abstract Builder queueName(@Nullable String queueName);
74-
75-
public abstract Builder host(String host);
76-
77-
public abstract Builder username(String username);
78-
79-
public abstract Builder password(String password);
80-
81-
public abstract Builder vpnName(String vpnName);
82-
83-
public abstract BasicAuthJcsmpSessionService build();
84-
}
54+
/** The Queue to receive messages from. */
55+
abstract @Nullable Queue queue();
8556

8657
@Nullable private transient JCSMPSession jcsmpSession;
8758
@Nullable private transient MessageReceiver messageReceiver;
@@ -90,9 +61,19 @@ public abstract static class Builder {
9061
new ConcurrentLinkedQueue<>();
9162
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
9263

64+
public static JcsmpSessionService create(JCSMPProperties jcsmpProperties, @Nullable Queue queue) {
65+
return new AutoValue_JcsmpSessionService(jcsmpProperties, queue);
66+
}
67+
68+
@Override
69+
public JCSMPProperties getSessionProperties() {
70+
return jcsmpProperties();
71+
}
72+
9373
@Override
9474
public void connect() {
95-
retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
75+
retryCallableManager.retryCallable(
76+
this::connectReadSession, ImmutableSet.of(JCSMPException.class));
9677
}
9778

9879
@Override
@@ -158,10 +139,7 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
158139
}
159140

160141
private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
161-
162-
Queue queue =
163-
JCSMPFactory.onlyInstance()
164-
.createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set."));
142+
Queue queue = checkStateNotNull(queue(), "SolaceIO.Read: Queue is not set.");
165143

166144
ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
167145
flowProperties.setEndpoint(queue);
@@ -190,9 +168,9 @@ private static FlowReceiver createFlowReceiver(
190168
return jcsmpSession.createFlow(null, flowProperties, endpointProperties);
191169
}
192170

193-
private int connectSession() throws JCSMPException {
171+
private int connectReadSession() throws JCSMPException {
194172
if (jcsmpSession == null) {
195-
jcsmpSession = createSessionObject();
173+
jcsmpSession = createReadSessionObject();
196174
}
197175
jcsmpSession.connect();
198176
return 0;
@@ -206,25 +184,12 @@ private int connectWriteSession(SubmissionMode mode) throws JCSMPException {
206184
return 0;
207185
}
208186

209-
private JCSMPSession createSessionObject() throws InvalidPropertiesException {
210-
JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties());
211-
return JCSMPFactory.onlyInstance().createSession(properties);
187+
private JCSMPSession createReadSessionObject() throws InvalidPropertiesException {
188+
return JCSMPFactory.onlyInstance().createSession(jcsmpProperties());
212189
}
213190

214191
private JCSMPSession createWriteSessionObject(SubmissionMode mode)
215192
throws InvalidPropertiesException {
216193
return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode));
217194
}
218-
219-
@Override
220-
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
221-
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());
222-
223-
baseProps.setProperty(
224-
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
225-
baseProps.setProperty(JCSMPProperties.USERNAME, username());
226-
baseProps.setProperty(JCSMPProperties.PASSWORD, password());
227-
baseProps.setProperty(JCSMPProperties.HOST, host());
228-
return baseProps;
229-
}
230195
}

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
* messaging system. It allows for establishing a connection, creating a message-receiver object,
3333
* checking if the connection is closed or not, and gracefully closing the session.
3434
*
35-
* <p>Override this class and the method {@link #initializeSessionProperties(JCSMPProperties)} with
36-
* your specific properties, including all those related to authentication.
35+
* <p>Override this class and the method {@link #getSessionProperties()} with your specific
36+
* properties, including all those related to authentication.
3737
*
3838
* <p>The connector will call the method only once per session created, so you can perform
3939
* relatively heavy operations in that method (e.g. connect to a store or vault to retrieve
@@ -70,8 +70,7 @@
7070
* <p>The connector ensures that no two threads will be calling that method at the same time, so you
7171
* don't have to take any specific precautions to avoid race conditions.
7272
*
73-
* <p>For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link
74-
* BasicAuthJcsmpSessionServiceFactory}.
73+
* <p>For basic authentication, use {@link BasicAuthJcsmpSessionServiceFactory}.
7574
*
7675
* <p>For other situations, you need to extend this class and implement the `equals` method, so two
7776
* instances of your class can be compared by value. We recommend using AutoValue for that. For
@@ -143,11 +142,7 @@ public abstract class SessionService implements Serializable {
143142

144143
/**
145144
* Override this method and provide your specific properties, including all those related to
146-
* authentication, and possibly others too. The {@code}baseProperties{@code} parameter sets the
147-
* Solace VPN to "default" if none is specified.
148-
*
149-
* <p>You should add your properties to the parameter {@code}baseProperties{@code}, and return the
150-
* result.
145+
* authentication, and possibly others too.
151146
*
152147
* <p>The method will be used whenever the session needs to be created or refreshed. If you are
153148
* setting credentials with expiration, just make sure that the latest available credentials (e.g.
@@ -160,7 +155,7 @@ public abstract class SessionService implements Serializable {
160155
* href="https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html">https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html</a>
161156
* </ul>
162157
*/
163-
public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties);
158+
public abstract JCSMPProperties getSessionProperties();
164159

165160
/**
166161
* You need to override this method to be able to compare these objects by value. We recommend
@@ -187,29 +182,30 @@ public abstract class SessionService implements Serializable {
187182
* token), this method will be called again.
188183
*/
189184
public final JCSMPProperties initializeWriteSessionProperties(SolaceIO.SubmissionMode mode) {
190-
JCSMPProperties jcsmpProperties = initializeSessionProperties(getDefaultProperties());
185+
JCSMPProperties jcsmpProperties = getSessionProperties();
191186
return overrideConnectorProperties(jcsmpProperties, mode);
192187
}
193188

194-
private static JCSMPProperties getDefaultProperties() {
195-
JCSMPProperties props = new JCSMPProperties();
196-
props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
197-
// Outgoing messages will have a sender timestamp field populated
198-
props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
199-
// Make XMLProducer safe to access from several threads. This is the default value, setting
200-
// it just in case.
201-
props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);
202-
203-
return props;
204-
}
205-
206189
/**
207190
* This method overrides some properties for the broker session to prevent misconfiguration,
208191
* taking into account how the write connector works.
209192
*/
210193
private static JCSMPProperties overrideConnectorProperties(
211194
JCSMPProperties props, SolaceIO.SubmissionMode mode) {
212195

196+
if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
197+
props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
198+
}
199+
if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
200+
// Outgoing messages will have a sender timestamp field populated
201+
props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
202+
}
203+
if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
204+
// Make XMLProducer safe to access from several threads. This is the default value, setting
205+
// it just in case.
206+
props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);
207+
}
208+
213209
// PUB_ACK_WINDOW_SIZE heavily affects performance when publishing persistent
214210
// messages. It can be a value between 1 and 255. This is the batch size for the ack
215211
// received from Solace. A value of 1 will have the lowest latency, but a very low

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.solacesystems.jcsmp.Queue;
2121
import java.io.Serializable;
2222
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
23+
import org.apache.beam.sdk.io.solace.data.Solace;
2324
import org.checkerframework.checker.nullness.qual.Nullable;
2425

2526
/**
@@ -61,7 +62,7 @@ public abstract class SessionServiceFactory implements Serializable {
6162
* This could be used to associate the created SessionService with a specific queue for message
6263
* handling.
6364
*/
64-
@Nullable Queue queue;
65+
private @Nullable Queue queue;
6566

6667
/**
6768
* The write submission mode. This is set when the writers are created. This property is used only
@@ -75,6 +76,33 @@ public abstract class SessionServiceFactory implements Serializable {
7576
*/
7677
public abstract SessionService create();
7778

79+
/**
80+
* This method is called in the {@link
81+
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
82+
* to set the Queue reference based on {@link
83+
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link
84+
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)}. The queue can be retrieved in
85+
* the classes that inherit {@link SessionServiceFactory} with the getter method {@link
86+
* SessionServiceFactory#getQueue()}
87+
*/
88+
public final void setQueue(Queue queue) {
89+
this.queue = queue;
90+
}
91+
92+
/**
93+
* Getter for the queue. This is nullable, because at the construction time this reference is
94+
* null. Once the pipeline is compiled and the {@link
95+
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
96+
* is called, this reference is valid.
97+
*
98+
* @return a reference to the queue which is set with the {@link
99+
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link
100+
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)}
101+
*/
102+
public final @Nullable Queue getQueue() {
103+
return queue;
104+
}
105+
78106
/**
79107
* You need to override this method to be able to compare these objects by value. We recommend
80108
* using AutoValue for that.
@@ -89,15 +117,6 @@ public abstract class SessionServiceFactory implements Serializable {
89117
@Override
90118
public abstract int hashCode();
91119

92-
/**
93-
* This method is called in the {@link
94-
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
95-
* to set the Queue reference.
96-
*/
97-
public void setQueue(Queue queue) {
98-
this.queue = queue;
99-
}
100-
101120
/**
102121
* Called by the write connector to set the submission mode used to create the message producers.
103122
*/

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void connect() {
6161
}
6262

6363
@Override
64-
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
64+
public JCSMPProperties getSessionProperties() {
6565
throw new UnsupportedOperationException(exceptionMessage);
6666
}
6767
}

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ public Queue<PublishResult> getPublishedResultsQueue() {
102102
public void connect() {}
103103

104104
@Override
105-
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
105+
public JCSMPProperties getSessionProperties() {
106106
// Let's override some properties that will be overriden by the connector
107107
// Opposite of the mode, to test that is overriden
108+
JCSMPProperties baseProperties = new JCSMPProperties();
108109
baseProperties.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, callbackOnReactor);
109-
110110
baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, ackWindowSizeForTesting);
111111

112112
return baseProperties;

0 commit comments

Comments
 (0)