From c16e1795864d8f1139cf05cac17085e77e9fe62b Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Wed, 16 May 2018 00:21:45 -0500 Subject: [PATCH 1/4] Creating udp multicast gateway example --- build.gradle | 26 +++ dsl/synchronous-udp-multicast/README.md | 69 ++++++ dsl/synchronous-udp-multicast/pom.xml | 212 ++++++++++++++++++ .../synchronous/multicast/Application.java | 47 ++++ .../gateway/UDPMulticastGateway.java | 24 ++ .../handler/UDPMulticastHandler.java | 26 +++ .../multicast/starter/ExtendedMessage.java | 50 +++++ .../starter/ExtendedMessageTransformer.java | 36 +++ .../starter/SynchronousUDPStarter.java | 130 +++++++++++ .../src/main/resources/application.yml | 4 + .../multicast/ApplicationTest.java | 47 ++++ 11 files changed, 671 insertions(+) create mode 100644 dsl/synchronous-udp-multicast/README.md create mode 100644 dsl/synchronous-udp-multicast/pom.xml create mode 100644 dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/Application.java create mode 100644 dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/gateway/UDPMulticastGateway.java create mode 100644 dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/handler/UDPMulticastHandler.java create mode 100644 dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java create mode 100644 dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java create mode 100644 dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java create mode 100644 dsl/synchronous-udp-multicast/src/main/resources/application.yml create mode 100644 dsl/synchronous-udp-multicast/src/test/java/org/springframework/integration/samples/dsl/synchronous/multicast/ApplicationTest.java diff --git a/build.gradle b/build.gradle index fdbb9b207..db4f14529 100644 --- a/build.gradle +++ b/build.gradle @@ -1368,6 +1368,32 @@ project('kafka-dsl') { } } +project('synchronous-udp-multicast') { + description = 'Java DSL synchronous UDP multicast' + + apply plugin: 'org.springframework.boot' + + dependencies { + compile 'org.springframework.boot:spring-boot-starter-web' + compile 'org.springframework.boot:spring-boot-starter-integration' + compile 'org.springframework.integration:spring-integration-ip' + compile 'org.springframework.integration:spring-integration-http' + testCompile 'org.springframework.boot:spring-boot-starter-test' + } + bootRun { + main = 'org.springframework.integration.samples.dsl.synchronous.multicast.Application' + } + + task run(type: JavaExec) { + main 'org.springframework.integration.samples.dsl.synchronous.multicast.Application' + classpath = sourceSets.main.runtimeClasspath + } + + tasks.withType(JavaExec) { + standardInput = System.in + } +} + project('file-split-ftp') { description = 'File Split FTP' diff --git a/dsl/synchronous-udp-multicast/README.md b/dsl/synchronous-udp-multicast/README.md new file mode 100644 index 000000000..07e7aacee --- /dev/null +++ b/dsl/synchronous-udp-multicast/README.md @@ -0,0 +1,69 @@ +Spring Integration Java DSL and Apache Kafka Sample +============== + +This example demonstrates the use of `Kafka09` Namespace Factory from Spring Integration Java DSL. + +## Running the sample + +Start Apache Zookeeper and Apache Kafka according to the documentation for the Apache Kafka project. + + $ gradlew :kafka:run + +This will package the application and run it using the [Gradle Application Plugin](http://www.gradle.org/docs/current/userguide/application_plugin.html) + +#### Using an IDE such as SpringSource Tool Suite™ (STS) + +In STS (Eclipse), go to package **org.springframework.integration.samples.kafka**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application). + +### Output + +The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic`; this must exist or the broker must be configured to auto-create topics. + +It then dynamically creates a new inbound adapter for a different topic `si.new.topic` and sends 10 messages there. + +The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads using a no-arg gateway method and writes to stdout: + + Sending 10 messages... + Send to Kafka: foo0 + Send to Kafka: foo1 + Send to Kafka: foo2 + Send to Kafka: foo3 + Send to Kafka: foo4 + Send to Kafka: foo5 + Send to Kafka: foo6 + Send to Kafka: foo7 + Send to Kafka: foo8 + Send to Kafka: foo9 + GenericMessage [payload=foo0, headers={kafka_offset=847, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo1, headers={kafka_offset=848, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo2, headers={kafka_offset=849, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo3, headers={kafka_offset=850, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo4, headers={kafka_offset=851, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo5, headers={kafka_offset=852, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo6, headers={kafka_offset=853, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo7, headers={kafka_offset=854, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo8, headers={kafka_offset=855, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo9, headers={kafka_offset=856, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + Adding an adapter for a second topic and sending 10 messages... + Send to Kafka: bar0 + Send to Kafka: bar1 + Send to Kafka: bar2 + Send to Kafka: bar3 + Send to Kafka: bar4 + Send to Kafka: bar5 + Send to Kafka: bar6 + Send to Kafka: bar7 + Send to Kafka: bar8 + Send to Kafka: bar9 + GenericMessage [payload=bar0, headers={kafka_offset=190, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar1, headers={kafka_offset=191, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar2, headers={kafka_offset=192, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar3, headers={kafka_offset=193, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar4, headers={kafka_offset=194, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar5, headers={kafka_offset=195, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar6, headers={kafka_offset=196, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar7, headers={kafka_offset=197, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar8, headers={kafka_offset=198, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar9, headers={kafka_offset=199, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + +Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained between executions). diff --git a/dsl/synchronous-udp-multicast/pom.xml b/dsl/synchronous-udp-multicast/pom.xml new file mode 100644 index 000000000..3876a65fd --- /dev/null +++ b/dsl/synchronous-udp-multicast/pom.xml @@ -0,0 +1,212 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.0.2.RELEASE + + org.springframework.integration.samples + synchronous-udp-multicast + 5.0.0.BUILD-SNAPSHOT + Java DSL synchronous UDP multicast + Java DSL synchronous UDP multicast + http://projects.spring.io/spring-integration + + SpringIO + https://spring.io + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + estigma88 + Daniel Andres Pelaez Lopez + estigma88@gmail.com + + project lead + + + + + scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git + scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git + https://github.com/spring-projects/spring-integration-samples + + + + org.springframework.boot + spring-boot-starter-integration + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-core + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-ip + 5.0.4.RELEASE + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-http + 5.0.4.RELEASE + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.apache.logging.log4j + log4j-core + 2.7 + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + junit + junit + 4.12 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + * + org.hamcrest + + + + + org.hamcrest + hamcrest-all + 1.3 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.mockito + mockito-core + 2.10.0 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + * + org.hamcrest + + + + + org.springframework + spring-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.boot + spring-boot-starter-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + + + repo.spring.io.milestone + Spring Framework Maven Milestone Repository + https://repo.spring.io/libs-milestone + + + repo.spring.io.snapshot + Spring Framework Maven Snapshot Repository + https://repo.spring.io/libs-snapshot + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + org.springframework.boot + spring-boot-dependencies + 2.0.2.RELEASE + import + pom + + + org.springframework + spring-framework-bom + 5.0.5.RELEASE + import + pom + + + org.springframework.integration + spring-integration-bom + 5.0.5.RELEASE + import + pom + + + + diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/Application.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/Application.java new file mode 100644 index 000000000..d2ee9a306 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/Application.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.samples.dsl.synchronous.multicast.starter.SynchronousUDPStarter; + +/** + * @author Daniel Andres Pelaez Lopez + */ +@SpringBootApplication +public class Application { + + public static void main(String[] args) throws Exception { + SpringApplication.run(Application.class, args); + } + + @Bean + public SynchronousUDPStarter synchronousUDPStarter(IntegrationFlowContext flowContext, + @Value("${synchronous.multicast.group}") String group, + @Value("${synchronous.multicast.port}") Integer port) { + SynchronousUDPStarter synchronousUDPStarter = new SynchronousUDPStarter(flowContext, group, port); + + synchronousUDPStarter.init(); + + return synchronousUDPStarter; + } + +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/gateway/UDPMulticastGateway.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/gateway/UDPMulticastGateway.java new file mode 100644 index 000000000..1bcbaf72b --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/gateway/UDPMulticastGateway.java @@ -0,0 +1,24 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.gateway; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public interface UDPMulticastGateway { + String send(String request); +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/handler/UDPMulticastHandler.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/handler/UDPMulticastHandler.java new file mode 100644 index 000000000..dd8141e0c --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/handler/UDPMulticastHandler.java @@ -0,0 +1,26 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.handler; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class UDPMulticastHandler { + public String handle(String request) { + return "Response for '" + request + "'"; + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java new file mode 100644 index 000000000..dc41320e5 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java @@ -0,0 +1,50 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.starter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class ExtendedMessage { + private final String replyChannelId; + private final String errorChannelId; + private final String data; + + @JsonCreator + ExtendedMessage(@JsonProperty("replyChannelId") String replyChannelId, + @JsonProperty("errorChannelId") String errorChannelId, + @JsonProperty("data") String data) { + this.replyChannelId = replyChannelId; + this.errorChannelId = errorChannelId; + this.data = data; + } + + public String getReplyChannelId() { + return replyChannelId; + } + + public String getErrorChannelId() { + return errorChannelId; + } + + public String getData() { + return data; + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java new file mode 100644 index 000000000..c9b894ee6 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.starter; + +import org.springframework.integration.transformer.Transformer; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class ExtendedMessageTransformer implements Transformer { + @Override + public Message transform(Message message) { + return MessageBuilder + .withPayload(new ExtendedMessage((String) message.getHeaders().get("replyChannel"), + (String) message.getHeaders().get("errorChannel"), + (String) message.getPayload())) + .copyHeaders(message.getHeaders()) + .build(); + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java new file mode 100644 index 000000000..fde68ae07 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java @@ -0,0 +1,130 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast.starter; + +import org.springframework.http.HttpMethod; +import org.springframework.integration.channel.NullChannel; +import org.springframework.integration.dsl.*; +import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.http.dsl.Http; +import org.springframework.integration.ip.dsl.Udp; +import org.springframework.integration.samples.dsl.synchronous.multicast.gateway.UDPMulticastGateway; +import org.springframework.integration.samples.dsl.synchronous.multicast.handler.UDPMulticastHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.MimeTypeUtils; + +/** + * @author Daniel Andres Pelaez Lopez + */ +public class SynchronousUDPStarter { + private static final String MESSAGES_PATH = "messages/"; + private static final String REPLY_CHANNEL_ID_HEADER = "replyChannelId"; + private static final String ERROR_CHANNEL_ID_HEADER = "errorChannelId"; + private static final String BASE_URL = "http://localhost:8080/"; + private static final String HTTP_OUTBOUND_CHANNEL = "httpOutbound"; + private final IntegrationFlowContext flowContext; + private final String group; + private final Integer port; + + public SynchronousUDPStarter(IntegrationFlowContext flowContext, String group, Integer port) { + this.flowContext = flowContext; + this.group = group; + this.port = port; + } + + public void init() { + StandardIntegrationFlow httpInbound = getHttpInboundFlow(); + + StandardIntegrationFlow httpOutbound = getHttpOutboundFlow(); + + StandardIntegrationFlow udpInbound = getUDPInboundFlow(); + + StandardIntegrationFlow udpOutbound = getUDPOutboundFlow(); + + flowContext.registration(httpInbound).id("httpInboundFlow").register(); + flowContext.registration(httpOutbound).id("httpOutboundFlow").register(); + flowContext.registration(udpInbound).id("udpInboundFlow").register(); + flowContext.registration(udpOutbound).id("udpOutboundFlow").register(); + + httpInbound.start(); + httpOutbound.start(); + udpInbound.start(); + udpOutbound.start(); + } + + private StandardIntegrationFlow getUDPOutboundFlow() { + return IntegrationFlows.from(UDPMulticastGateway.class) + .enrichHeaders(m -> m + .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) + .enrichHeaders(HeaderEnricherSpec::headerChannelsToString) + .transform(new ExtendedMessageTransformer()) + .transform(Transformers.toJson()) + .handle(Udp.outboundMulticastAdapter(group, port)) + .get(); + } + + private StandardIntegrationFlow getUDPInboundFlow() { + return IntegrationFlows.from(Udp.inboundMulticastAdapter(port, group)) + .enrichHeaders(m -> m + .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) + .transform(Transformers.fromJson(ExtendedMessage.class)) + .enrichHeaders(h -> h + .headerFunction(REPLY_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getReplyChannelId()) + .headerFunction(ERROR_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getErrorChannelId())) + .transform(ExtendedMessage::getData) + .handle(new UDPMulticastHandler(), "handle") + .publishSubscribeChannel(p -> p + .subscribe(s -> s + .enrichHeaders(h -> h + .header(MessageHeaders.REPLY_CHANNEL, new NullChannel(), true) + .header(MessageHeaders.ERROR_CHANNEL, new NullChannel(), true)) + .channel(HTTP_OUTBOUND_CHANNEL))) + .get(); + } + + private StandardIntegrationFlow getHttpOutboundFlow() { + return IntegrationFlows.from(HTTP_OUTBOUND_CHANNEL) + .enrichHeaders(m -> m + .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) + .handle(Http.outboundGateway(BASE_URL + MESSAGES_PATH) + .httpMethod(HttpMethod.POST) + .mappedRequestHeaders(REPLY_CHANNEL_ID_HEADER, ERROR_CHANNEL_ID_HEADER) + .expectedResponseType(String.class)) + .get(); + } + + private StandardIntegrationFlow getHttpInboundFlow() { + return IntegrationFlows.from( + Http.inboundGateway(MESSAGES_PATH) + .requestMapping(m -> m.methods(HttpMethod.POST) + .consumes(MimeTypeUtils.APPLICATION_JSON_VALUE) + .produces(MimeTypeUtils.APPLICATION_JSON_VALUE)) + .mappedRequestHeaders(REPLY_CHANNEL_ID_HEADER, ERROR_CHANNEL_ID_HEADER) + .requestPayloadType(String.class)) + .publishSubscribeChannel(p -> p + .subscribe(IntegrationFlowDefinition::bridge) + .subscribe(sub -> sub + .enrichHeaders(h -> h + .headerFunction(MessageHeaders.REPLY_CHANNEL, m -> m + .getHeaders().get(REPLY_CHANNEL_ID_HEADER.toLowerCase()), true) + .headerFunction(MessageHeaders.ERROR_CHANNEL, m -> m + .getHeaders().get(ERROR_CHANNEL_ID_HEADER.toLowerCase()), true)) + .bridge()) + ) + .get(); + } +} diff --git a/dsl/synchronous-udp-multicast/src/main/resources/application.yml b/dsl/synchronous-udp-multicast/src/main/resources/application.yml new file mode 100644 index 000000000..799752258 --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/main/resources/application.yml @@ -0,0 +1,4 @@ +synchronous: + multicast: + group: 224.0.0.1 + port: 2000 diff --git a/dsl/synchronous-udp-multicast/src/test/java/org/springframework/integration/samples/dsl/synchronous/multicast/ApplicationTest.java b/dsl/synchronous-udp-multicast/src/test/java/org/springframework/integration/samples/dsl/synchronous/multicast/ApplicationTest.java new file mode 100644 index 000000000..ad74f2f3a --- /dev/null +++ b/dsl/synchronous-udp-multicast/src/test/java/org/springframework/integration/samples/dsl/synchronous/multicast/ApplicationTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.synchronous.multicast; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.integration.samples.dsl.synchronous.multicast.gateway.UDPMulticastGateway; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Daniel Andres Pelaez Lopez + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) +public class ApplicationTest { + @Autowired + private UDPMulticastGateway udpMulticastGateway; + + public ApplicationTest() { + } + + @Test(timeout = 5000) + public void testUDPMulticastGateway() { + String response = udpMulticastGateway.send("request"); + + assertThat(response, is("Response for 'request'")); + } +} From 5c3588022b1758db87796664d604f35173cbd764 Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Wed, 16 May 2018 21:18:53 -0500 Subject: [PATCH 2/4] Adding documentation --- dsl/synchronous-udp-multicast/README.md | 78 ++++--------------- dsl/synchronous-udp-multicast/pom.xml | 78 ------------------- .../multicast/starter/ExtendedMessage.java | 20 ++--- .../starter/ExtendedMessageTransformer.java | 5 +- .../starter/SynchronousUDPStarter.java | 46 ++++++++--- 5 files changed, 66 insertions(+), 161 deletions(-) diff --git a/dsl/synchronous-udp-multicast/README.md b/dsl/synchronous-udp-multicast/README.md index 07e7aacee..915dedc7f 100644 --- a/dsl/synchronous-udp-multicast/README.md +++ b/dsl/synchronous-udp-multicast/README.md @@ -1,69 +1,25 @@ -Spring Integration Java DSL and Apache Kafka Sample +Spring Integration Java DSL synchronous UDP multicast ============== -This example demonstrates the use of `Kafka09` Namespace Factory from Spring Integration Java DSL. +This example demonstrates the use of `Http Inbound Components`, `Http Outbound Components` and `UDP Adapters` to create a UDP multicast synchronous gateway. -## Running the sample - -Start Apache Zookeeper and Apache Kafka according to the documentation for the Apache Kafka project. - - $ gradlew :kafka:run - -This will package the application and run it using the [Gradle Application Plugin](http://www.gradle.org/docs/current/userguide/application_plugin.html) +## Flow -#### Using an IDE such as SpringSource Tool Suite™ (STS) +The idea is to send a UDP multicast message in an synchronous way, so, it waits until a response arrieves from any of the UDP nodes joined to the multicast group. -In STS (Eclipse), go to package **org.springframework.integration.samples.kafka**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application). + Client Server + + udpMulticastOutbound -->> udpMulticastInbound + | + | + v + v + httpInbound <<-- httpOutbound + + -### Output - -The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic`; this must exist or the broker must be configured to auto-create topics. - -It then dynamically creates a new inbound adapter for a different topic `si.new.topic` and sends 10 messages there. - -The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads using a no-arg gateway method and writes to stdout: +## Running the sample - Sending 10 messages... - Send to Kafka: foo0 - Send to Kafka: foo1 - Send to Kafka: foo2 - Send to Kafka: foo3 - Send to Kafka: foo4 - Send to Kafka: foo5 - Send to Kafka: foo6 - Send to Kafka: foo7 - Send to Kafka: foo8 - Send to Kafka: foo9 - GenericMessage [payload=foo0, headers={kafka_offset=847, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo1, headers={kafka_offset=848, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo2, headers={kafka_offset=849, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo3, headers={kafka_offset=850, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo4, headers={kafka_offset=851, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo5, headers={kafka_offset=852, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo6, headers={kafka_offset=853, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo7, headers={kafka_offset=854, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo8, headers={kafka_offset=855, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo9, headers={kafka_offset=856, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - Adding an adapter for a second topic and sending 10 messages... - Send to Kafka: bar0 - Send to Kafka: bar1 - Send to Kafka: bar2 - Send to Kafka: bar3 - Send to Kafka: bar4 - Send to Kafka: bar5 - Send to Kafka: bar6 - Send to Kafka: bar7 - Send to Kafka: bar8 - Send to Kafka: bar9 - GenericMessage [payload=bar0, headers={kafka_offset=190, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar1, headers={kafka_offset=191, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar2, headers={kafka_offset=192, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar3, headers={kafka_offset=193, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar4, headers={kafka_offset=194, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar5, headers={kafka_offset=195, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar6, headers={kafka_offset=196, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar7, headers={kafka_offset=197, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar8, headers={kafka_offset=198, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=bar9, headers={kafka_offset=199, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] +Run the test example. -Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained between executions). + $ gradlew :synchronous-udp-multicast:test diff --git a/dsl/synchronous-udp-multicast/pom.xml b/dsl/synchronous-udp-multicast/pom.xml index 3876a65fd..c193cad9a 100644 --- a/dsl/synchronous-udp-multicast/pom.xml +++ b/dsl/synchronous-udp-multicast/pom.xml @@ -50,17 +50,6 @@ - - org.springframework.integration - spring-integration-core - compile - - - jackson-module-kotlin - com.fasterxml.jackson.module - - - org.springframework.integration spring-integration-ip @@ -85,73 +74,6 @@ - - org.apache.logging.log4j - log4j-core - 2.7 - compile - - - jackson-module-kotlin - com.fasterxml.jackson.module - - - - - junit - junit - 4.12 - test - - - jackson-module-kotlin - com.fasterxml.jackson.module - - - * - org.hamcrest - - - - - org.hamcrest - hamcrest-all - 1.3 - test - - - jackson-module-kotlin - com.fasterxml.jackson.module - - - - - org.mockito - mockito-core - 2.10.0 - test - - - jackson-module-kotlin - com.fasterxml.jackson.module - - - * - org.hamcrest - - - - - org.springframework - spring-test - test - - - jackson-module-kotlin - com.fasterxml.jackson.module - - - org.springframework.boot spring-boot-starter-test diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java index dc41320e5..4e26f8e1c 100644 --- a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessage.java @@ -23,25 +23,25 @@ * @author Daniel Andres Pelaez Lopez */ public class ExtendedMessage { - private final String replyChannelId; - private final String errorChannelId; + private final String replyOriginChannelId; + private final String errorOriginChannelId; private final String data; @JsonCreator - ExtendedMessage(@JsonProperty("replyChannelId") String replyChannelId, - @JsonProperty("errorChannelId") String errorChannelId, + ExtendedMessage(@JsonProperty("replyOriginChannelId") String replyOriginChannelId, + @JsonProperty("errorOriginChannelId") String errorOriginChannelId, @JsonProperty("data") String data) { - this.replyChannelId = replyChannelId; - this.errorChannelId = errorChannelId; + this.replyOriginChannelId = replyOriginChannelId; + this.errorOriginChannelId = errorOriginChannelId; this.data = data; } - public String getReplyChannelId() { - return replyChannelId; + public String getReplyOriginChannelId() { + return replyOriginChannelId; } - public String getErrorChannelId() { - return errorChannelId; + public String getErrorOriginChannelId() { + return errorOriginChannelId; } public String getData() { diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java index c9b894ee6..b00a12240 100644 --- a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/ExtendedMessageTransformer.java @@ -18,6 +18,7 @@ import org.springframework.integration.transformer.Transformer; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; /** @@ -27,8 +28,8 @@ public class ExtendedMessageTransformer implements Transformer { @Override public Message transform(Message message) { return MessageBuilder - .withPayload(new ExtendedMessage((String) message.getHeaders().get("replyChannel"), - (String) message.getHeaders().get("errorChannel"), + .withPayload(new ExtendedMessage((String) message.getHeaders().get(MessageHeaders.REPLY_CHANNEL), + (String) message.getHeaders().get(MessageHeaders.ERROR_CHANNEL), (String) message.getPayload())) .copyHeaders(message.getHeaders()) .build(); diff --git a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java index fde68ae07..e6f59c9da 100644 --- a/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java +++ b/dsl/synchronous-udp-multicast/src/main/java/org/springframework/integration/samples/dsl/synchronous/multicast/starter/SynchronousUDPStarter.java @@ -21,6 +21,7 @@ import org.springframework.integration.dsl.*; import org.springframework.integration.dsl.context.IntegrationFlowContext; import org.springframework.integration.http.dsl.Http; +import org.springframework.integration.ip.IpHeaders; import org.springframework.integration.ip.dsl.Udp; import org.springframework.integration.samples.dsl.synchronous.multicast.gateway.UDPMulticastGateway; import org.springframework.integration.samples.dsl.synchronous.multicast.handler.UDPMulticastHandler; @@ -32,9 +33,9 @@ */ public class SynchronousUDPStarter { private static final String MESSAGES_PATH = "messages/"; - private static final String REPLY_CHANNEL_ID_HEADER = "replyChannelId"; - private static final String ERROR_CHANNEL_ID_HEADER = "errorChannelId"; - private static final String BASE_URL = "http://localhost:8080/"; + private static final String REPLY_ORIGINAL_CHANNEL_ID_HEADER = "replyOriginChannelId"; + private static final String ERROR_ORIGINAL_CHANNEL_ID_HEADER = "errorOriginChannelId"; + private static final String BASE_URL = "http://{host}:8080/"; private static final String HTTP_OUTBOUND_CHANNEL = "httpOutbound"; private final IntegrationFlowContext flowContext; private final String group; @@ -67,10 +68,14 @@ public void init() { } private StandardIntegrationFlow getUDPOutboundFlow() { + //UDPMulticastGateway is the gateway through we are going to consume this flow return IntegrationFlows.from(UDPMulticastGateway.class) .enrichHeaders(m -> m .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) + //Persist the replayChannel and errorChannel headers, getting IDs .enrichHeaders(HeaderEnricherSpec::headerChannelsToString) + //Transform the message adding the replayChannel and errorChannel IDs as a part of the payload + //To retrieve later after the response arrives .transform(new ExtendedMessageTransformer()) .transform(Transformers.toJson()) .handle(Udp.outboundMulticastAdapter(group, port)) @@ -82,16 +87,23 @@ private StandardIntegrationFlow getUDPInboundFlow() { .enrichHeaders(m -> m .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) .transform(Transformers.fromJson(ExtendedMessage.class)) + //Add new headers to the Message named replyOriginChannelId and errorOriginChannelId from the payload we receive .enrichHeaders(h -> h - .headerFunction(REPLY_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getReplyChannelId()) - .headerFunction(ERROR_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getErrorChannelId())) + .headerFunction(REPLY_ORIGINAL_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getReplyOriginChannelId()) + .headerFunction(ERROR_ORIGINAL_CHANNEL_ID_HEADER, m -> ((ExtendedMessage) m.getPayload()).getErrorOriginChannelId())) + //Get the read data we want to handle .transform(ExtendedMessage::getData) .handle(new UDPMulticastHandler(), "handle") + //Publish a Message response using the httpOutbound channel .publishSubscribeChannel(p -> p .subscribe(s -> s + //The inboundMulticastAdapter does not have replyChannel and errorChannel by default + //So, we add a NullChannel where httpOutbound is going to respond + //We do not care about if this request is successful, this communication flow is still unreliable .enrichHeaders(h -> h .header(MessageHeaders.REPLY_CHANNEL, new NullChannel(), true) .header(MessageHeaders.ERROR_CHANNEL, new NullChannel(), true)) + //Send the message to httpOutbound channel .channel(HTTP_OUTBOUND_CHANNEL))) .get(); } @@ -102,8 +114,12 @@ private StandardIntegrationFlow getHttpOutboundFlow() { .header(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)) .handle(Http.outboundGateway(BASE_URL + MESSAGES_PATH) .httpMethod(HttpMethod.POST) - .mappedRequestHeaders(REPLY_CHANNEL_ID_HEADER, ERROR_CHANNEL_ID_HEADER) - .expectedResponseType(String.class)) + //The headers we created in the UDPInboundFlow are mapped here to headers in the HTTP Request + //So, we can retrieve those from the request origin side + .mappedRequestHeaders(REPLY_ORIGINAL_CHANNEL_ID_HEADER, ERROR_ORIGINAL_CHANNEL_ID_HEADER) + .expectedResponseType(String.class) + //Get the request origin IP to response + .uriVariable("host", m -> m.getHeaders().get(IpHeaders.HOSTNAME))) .get(); } @@ -113,16 +129,26 @@ private StandardIntegrationFlow getHttpInboundFlow() { .requestMapping(m -> m.methods(HttpMethod.POST) .consumes(MimeTypeUtils.APPLICATION_JSON_VALUE) .produces(MimeTypeUtils.APPLICATION_JSON_VALUE)) - .mappedRequestHeaders(REPLY_CHANNEL_ID_HEADER, ERROR_CHANNEL_ID_HEADER) + //The headers we created in the HttpOutboundFlow in the HTTP Request are mapped here to headers in the Message + //The headers names are in lower case + .mappedRequestHeaders(REPLY_ORIGINAL_CHANNEL_ID_HEADER, ERROR_ORIGINAL_CHANNEL_ID_HEADER) .requestPayloadType(String.class)) + //Publish the Message to two subscribers .publishSubscribeChannel(p -> p + //First subscriber: response to HttpInboundFlow + //The Message with the default replyChannel and errorChannel are redirect using a bridge + //The bridge resolves those headers and uses those channels .subscribe(IntegrationFlowDefinition::bridge) + + //Second subscriber: response to UDPMulticastGateway (UDPOutboundFlow) + //The replyChannel and errorChannel headers are replaced with the replyOriginChannelId and errorOriginChannelId we sent at the beginning + //The bridge resolves those headers and uses those channels to unblock the UDPMulticastGateway .subscribe(sub -> sub .enrichHeaders(h -> h .headerFunction(MessageHeaders.REPLY_CHANNEL, m -> m - .getHeaders().get(REPLY_CHANNEL_ID_HEADER.toLowerCase()), true) + .getHeaders().get(REPLY_ORIGINAL_CHANNEL_ID_HEADER.toLowerCase()), true) .headerFunction(MessageHeaders.ERROR_CHANNEL, m -> m - .getHeaders().get(ERROR_CHANNEL_ID_HEADER.toLowerCase()), true)) + .getHeaders().get(ERROR_ORIGINAL_CHANNEL_ID_HEADER.toLowerCase()), true)) .bridge()) ) .get(); From 5354013f63bb765ee676bea56acaf7cd42abe518 Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Wed, 16 May 2018 21:24:17 -0500 Subject: [PATCH 3/4] Adding documentation about IntegrationFlowContext --- dsl/synchronous-udp-multicast/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsl/synchronous-udp-multicast/README.md b/dsl/synchronous-udp-multicast/README.md index 915dedc7f..5079d4bc4 100644 --- a/dsl/synchronous-udp-multicast/README.md +++ b/dsl/synchronous-udp-multicast/README.md @@ -1,7 +1,7 @@ Spring Integration Java DSL synchronous UDP multicast ============== -This example demonstrates the use of `Http Inbound Components`, `Http Outbound Components` and `UDP Adapters` to create a UDP multicast synchronous gateway. +This example demonstrates the use of `Http Inbound Components`, `Http Outbound Components`, `UDP Adapters` and `IntegrationFlowContext` class to create a UDP multicast synchronous gateway. ## Flow From 629de54aa771e57734c585915afe62008c113fdb Mon Sep 17 00:00:00 2001 From: Daniel Pelaez Date: Wed, 16 May 2018 21:26:31 -0500 Subject: [PATCH 4/4] Changing the flow format --- dsl/synchronous-udp-multicast/README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dsl/synchronous-udp-multicast/README.md b/dsl/synchronous-udp-multicast/README.md index 5079d4bc4..b8defab09 100644 --- a/dsl/synchronous-udp-multicast/README.md +++ b/dsl/synchronous-udp-multicast/README.md @@ -7,14 +7,14 @@ This example demonstrates the use of `Http Inbound Components`, `Http Outbound C The idea is to send a UDP multicast message in an synchronous way, so, it waits until a response arrieves from any of the UDP nodes joined to the multicast group. - Client Server + Client Server udpMulticastOutbound -->> udpMulticastInbound - | - | - v - v - httpInbound <<-- httpOutbound + | + | + v + v + httpInbound <<-- httpOutbound