Skip to content

Commit 272dde8

Browse files
GH-9228: Provide binding to ZeroMqMessageHandler
Fixes: #9228 * add docs * protected constructor in ZeroMqMessageHandlerSpec, expose them via ZeroMq * introduce ZeroMqUtils, for common Zero MQ utilities functions * use ZeroMqUtils.bindSocket in ZeroMqMessageProducer * refactor ZeroMqMessageHandler providing connectUrl and bindPort setters and simple constructors, following the same logic used for ZeroMqMessageProvider * fix tests to follow the new ZeroMqMessageHandler implementation * fix typo * add new updates to whats-new.adoc * address ZeroMQUtils comments * remove connectUrl and boundPort setters, add Javadoc to new constructors * add new DLS constructor for random port * add since closure in ZeroMqUtils * add DSL support methods and specific that, when not defined, the socket will be bound to a random port
1 parent 78ae221 commit 272dde8

File tree

9 files changed

+302
-33
lines changed

9 files changed

+302
-33
lines changed

Diff for: spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqHeaders.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
1717
package org.springframework.integration.zeromq;
1818

1919
/**
20-
* The message headers constants to repsent ZeroMq message attributes.
20+
* The message headers constants to represent ZeroMq message attributes.
2121
*
2222
* @author Artem Bilan
2323
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2020-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.zeromq;
18+
19+
import org.zeromq.ZMQ;
20+
21+
/**
22+
* Module that wraps common methods of ZeroMq integration classes
23+
*
24+
* @author Alessio Matricardi
25+
*
26+
* @since 6.4
27+
*
28+
*/
29+
public final class ZeroMqUtils {
30+
31+
/**
32+
* Bind the ZeroMq socket to the given port over the TCP transport protocol.
33+
* @param socket the ZeroMq socket
34+
* @param port the port to bind ZeroMq socket to over TCP. If equal to 0, the socket will bind to a random port.
35+
* @return the effectively bound port
36+
*/
37+
public static int bindSocket(ZMQ.Socket socket, int port) {
38+
if (port == 0) {
39+
return socket.bindToRandomPort("tcp://*");
40+
}
41+
else {
42+
boolean bound = socket.bind("tcp://*:" + port);
43+
if (!bound) {
44+
throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port);
45+
}
46+
return port;
47+
}
48+
}
49+
50+
private ZeroMqUtils() {
51+
}
52+
53+
}

Diff for: spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMq.java

+49
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* Factory class for ZeroMq components DSL.
2626
*
2727
* @author Artem Bilan
28+
* @author Alessio Matricardi
2829
*
2930
* @since 5.4
3031
*/
@@ -58,6 +59,17 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context,
5859
return outboundChannelAdapter(context, () -> connectUrl);
5960
}
6061

62+
/**
63+
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} and binding port.
64+
* @param context the {@link ZContext} to use.
65+
* @param port the port to bind ZeroMq socket to over TCP.
66+
* @return the spec.
67+
* @since 6.4
68+
*/
69+
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port) {
70+
return new ZeroMqMessageHandlerSpec(context, port);
71+
}
72+
6173
/**
6274
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}
6375
* and connection URL supplier.
@@ -84,6 +96,43 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context,
8496
return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType);
8597
}
8698

99+
/**
100+
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}.
101+
* The created socket will be bound to a random port.
102+
* @param context the {@link ZContext} to use.
103+
* @return the spec.
104+
* @since 6.4
105+
*/
106+
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context) {
107+
return new ZeroMqMessageHandlerSpec(context);
108+
}
109+
110+
/**
111+
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext} and {@link SocketType}.
112+
* The created socket will be bound to a random port.
113+
* @param context the {@link ZContext} to use.
114+
* @param socketType the {@link SocketType} for ZeroMq socket.
115+
* @return the spec.
116+
* @since 6.4
117+
*/
118+
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, SocketType socketType) {
119+
return new ZeroMqMessageHandlerSpec(context, socketType);
120+
}
121+
122+
/**
123+
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}, binding port
124+
* and {@link SocketType}.
125+
* @param context the {@link ZContext} to use.
126+
* @param port the port to bind ZeroMq socket to over TCP.
127+
* @param socketType the {@link SocketType} for ZeroMq socket.
128+
* @return the spec.
129+
* @since 6.4
130+
*/
131+
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, int port,
132+
SocketType socketType) {
133+
return new ZeroMqMessageHandlerSpec(context, port, socketType);
134+
}
135+
87136
/**
88137
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext},
89138
* connection URL supplier and {@link SocketType}.

Diff for: spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java

+44
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,26 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl) {
5252
this(context, () -> connectUrl);
5353
}
5454

55+
/**
56+
* Create an instance based on the provided {@link ZContext}.
57+
* The created socket will be bound to a random port.
58+
* @param context the {@link ZContext} to use for creating sockets.
59+
* @since 6.4
60+
*/
61+
protected ZeroMqMessageHandlerSpec(ZContext context) {
62+
this(context, SocketType.PAIR);
63+
}
64+
65+
/**
66+
* Create an instance based on the provided {@link ZContext} and binding port.
67+
* @param context the {@link ZContext} to use for creating sockets.
68+
* @param port the port to bind ZeroMq socket to over TCP.
69+
* @since 6.4
70+
*/
71+
protected ZeroMqMessageHandlerSpec(ZContext context, int port) {
72+
this(context, port, SocketType.PAIR);
73+
}
74+
5575
/**
5676
* Create an instance based on the provided {@link ZContext} and connection string supplier.
5777
* @param context the {@link ZContext} to use for creating sockets.
@@ -73,6 +93,30 @@ protected ZeroMqMessageHandlerSpec(ZContext context, String connectUrl, SocketTy
7393
this(context, () -> connectUrl, socketType);
7494
}
7595

96+
/**
97+
* Create an instance based on the provided {@link ZContext} and {@link SocketType}.
98+
* The created socket will be bound to a random port.
99+
* @param context the {@link ZContext} to use for creating sockets.
100+
* @param socketType the {@link SocketType} to use;
101+
* only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported.
102+
* @since 6.4
103+
*/
104+
protected ZeroMqMessageHandlerSpec(ZContext context, SocketType socketType) {
105+
super(new ZeroMqMessageHandler(context, socketType));
106+
}
107+
108+
/**
109+
* Create an instance based on the provided {@link ZContext}, binding port and {@link SocketType}.
110+
* @param context the {@link ZContext} to use for creating sockets.
111+
* @param port the port to bind ZeroMq socket to over TCP.
112+
* @param socketType the {@link SocketType} to use;
113+
* only {@link SocketType#PAIR}, {@link SocketType#PUB} and {@link SocketType#PUSH} are supported.
114+
* @since 6.4
115+
*/
116+
protected ZeroMqMessageHandlerSpec(ZContext context, int port, SocketType socketType) {
117+
super(new ZeroMqMessageHandler(context, port, socketType));
118+
}
119+
76120
/**
77121
* Create an instance based on the provided {@link ZContext}, connection string supplier and {@link SocketType}.
78122
* @param context the {@link ZContext} to use for creating sockets.

Diff for: spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
4141
import org.springframework.integration.support.management.IntegrationManagedResource;
4242
import org.springframework.integration.zeromq.ZeroMqHeaders;
43+
import org.springframework.integration.zeromq.ZeroMqUtils;
4344
import org.springframework.jmx.export.annotation.ManagedOperation;
4445
import org.springframework.jmx.export.annotation.ManagedResource;
4546
import org.springframework.lang.Nullable;
@@ -263,7 +264,7 @@ protected void doStart() {
263264
socket.connect(this.connectUrl);
264265
}
265266
else {
266-
this.bindPort.set(bindSocket(socket, this.bindPort.get()));
267+
this.bindPort.set(ZeroMqUtils.bindSocket(socket, this.bindPort.get()));
267268
}
268269
})
269270
.cache()
@@ -319,17 +320,4 @@ public void destroy() {
319320
this.socketMono.doOnNext(ZMQ.Socket::close).block();
320321
}
321322

322-
private static int bindSocket(ZMQ.Socket socket, int port) {
323-
if (port == 0) {
324-
return socket.bindToRandomPort("tcp://*");
325-
}
326-
else {
327-
boolean bound = socket.bind("tcp://*:" + port);
328-
if (!bound) {
329-
throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port);
330-
}
331-
return port;
332-
}
333-
}
334-
335323
}

0 commit comments

Comments
 (0)