From 1aa81039ab745e7cad08d5dcde8eb4f9b495edf4 Mon Sep 17 00:00:00 2001 From: VaishnaviNandakumar Date: Thu, 8 Jun 2023 22:43:12 +0530 Subject: [PATCH 1/9] Updated config file for amqp --- partials/AmqpConfig.java | 102 +++++++++----------- template/src/main/resources/application.yml | 22 ++--- 2 files changed, 51 insertions(+), 73 deletions(-) diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 1b203fe65..9b600d616 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -6,17 +6,11 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.integration.amqp.dsl.Amqp; -import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlows; -import org.springframework.messaging.MessageChannel; @Configuration public class Config { @@ -33,16 +27,17 @@ public class Config { @Value("${amqp.broker.password}") private String password; - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - @Value("${amqp.exchange.{{- channelName -}}}") + + {% for channelName, channel in asyncapi.channels() %} + @Value("${amqp.{{- channelName -}}.exchange}") private String {{channelName}}Exchange; - {% endif %}{% endfor %} - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} - @Value("${amqp.queue.{{- channelName -}}}") + @Value("${amqp.{{- channelName -}}.queue}") private String {{channelName}}Queue; - {% endif %}{% endfor %} + @Value("${amqp.{{- channelName -}}.routingKey}") + private String {{channelName}}RoutingKey; + {% endfor %} @Bean public ConnectionFactory connectionFactory() { @@ -54,64 +49,53 @@ public ConnectionFactory connectionFactory() { } @Bean - public AmqpAdmin amqpAdmin() { - return new RabbitAdmin(connectionFactory()); - } - - @Bean - public Declarables exchanges() { - return new Declarables( - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %} - {% endif %}{% endfor %} - ); - } - - @Bean - public Declarables queues() { - return new Declarables( - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} - new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %} - {% endif %}{% endfor %} - ); - } + public Declarables declarables() { + {% for channelName, channel in asyncapi.channels() %} + Queue {{channelName}}Queue = new Queue({{channelName}}Queue); + {% endfor %} - // consumer + {% for channelName, channel in asyncapi.channels() %} + TopicExchange {{channelName}}Exchange = new TopicExchange({{channelName}}Exchange); + {% endfor %} - @Autowired - MessageHandlerService messageHandlerService; - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + {% for channelName, channel in asyncapi.channels() %} + Binding {{channelName}}Binding = BindingBuilder.bind({{channelName}}Queue) + .to({{channelName}}Exchange).with({{channelName}}RoutingKey); + {% endfor %} - @Bean - public IntegrationFlow {{channelName | camelCase}}Flow() { - return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory(), {{channelName}}Queue)) - .handle(messageHandlerService::handle{{channelName | upperFirst}}) - .get(); + return new Declarables( + {% set i = 0 %} + {% for channelName, channel in asyncapi.channels() %} + {% if i == asyncapi.channels().size %} + {{channelName}}Queue, + {{channelName}}Exchange, + {{channelName}}Binding, + {% else %} + {{channelName}}Queue, + {{channelName}}Exchange, + {{channelName}}Binding + {% set i = i+1 %} + {% endif %} + {% endfor %} + ); } - {% endif %}{% endfor %} - // publisher @Bean - public RabbitTemplate rabbitTemplate() { - RabbitTemplate template = new RabbitTemplate(connectionFactory()); - return template; + public MessageConverter converter() { + return new Jackson2JsonMessageConverter(); } - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} @Bean - public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() { - return new DirectChannel(); + public AmqpAdmin amqpAdmin() { + return new RabbitAdmin(connectionFactory()); } @Bean - @ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") - public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) { - AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); - outbound.setExchangeName({{channelName}}Exchange); - outbound.setRoutingKey("#"); - return outbound; + public AmqpTemplate template() { + final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); + rabbitTemplate.setMessageConverter(converter()); + return rabbitTemplate; } - {% endif %}{% endfor %} } {% endmacro %} diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index 148633a24..026a06174 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -9,27 +9,21 @@ {%- endif -%} {%- endfor -%} -{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %} + {%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %} amqp: broker: {% for line in server.description() | splitByLines %} # {{line | safe}}{% endfor %} - host: {{server.url() | replace(':{port}', '') }} + host: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %} port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %} - username: + username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %} password: - exchange: - {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasSubscribe() and channel.subscribe().binding('amqp') %} - {{channelName}}: {{channel.subscribe().binding('amqp').exchange.name}} - {% endif %} - {% endfor %} - queue: {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasPublish() and channel.publish().binding('amqp') %} - {{channelName}}: {{channel.publish().binding('amqp').queue.name}} - {% endif %} + {{channelName}}: + exchange: {{channel.publish().binding('amqp').exchange}} + queue: {{channel.publish().binding('amqp').queue}} + routingKey: {{channel.publish().binding('amqp').routingKey}} {% endfor %} -{% endif %} + {% endif %} {% if server.protocol() == 'mqtt' %} mqtt: From de6040eb768e6a6749ba8dc623b3a28c89e95304 Mon Sep 17 00:00:00 2001 From: VaishnaviNandakumar Date: Sun, 11 Jun 2023 19:59:40 +0530 Subject: [PATCH 2/9] Added publisher service --- partials/AmqpListener.java | 0 partials/AmqpPublisher.java | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 partials/AmqpListener.java create mode 100644 partials/AmqpPublisher.java diff --git a/partials/AmqpListener.java b/partials/AmqpListener.java new file mode 100644 index 000000000..e69de29bb diff --git a/partials/AmqpPublisher.java b/partials/AmqpPublisher.java new file mode 100644 index 000000000..3fb292911 --- /dev/null +++ b/partials/AmqpPublisher.java @@ -0,0 +1,38 @@ +{% macro amqpPublisher(asyncapi, params) %} + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- for message in channel.subscribe().messages() %} +import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor -%} + {% endif -%} +{% endfor %} + + +@Service +public class PublisherService { + @Autowired + private RabbitTemplate template; + + {% for channelName, channel in asyncapi.channels() %} + @Value("${amqp.{{- channelName -}}.exchange}") + private String {{channelName}}Exchange; + + @Value("${amqp.{{- channelName -}}.routingKey}") + private String {{channelName}}RoutingKey; + {% endfor %} + + {% for channelName, channel in asyncapi.channels() %} + {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} + public void update{{schemaName}}(){ + {{schemaName}} {{channelName}}Payload = new {{schemaName}}(); + template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload); + } + {% endfor %} +} + +{% endmacro %} \ No newline at end of file From aaec4c300bfb1294bc3a7f56d7447a5cfc34049f Mon Sep 17 00:00:00 2001 From: VaishnaviNandakumar Date: Mon, 12 Jun 2023 22:58:01 +0530 Subject: [PATCH 3/9] Added listeners and publisher code --- partials/AmqpConfig.java | 28 ++++++++----------- partials/AmqpListener.java | 27 ++++++++++++++++++ partials/AmqpPublisher.java | 2 +- template/build.gradle | 1 + .../service/CommandLinePublisher.java | 3 +- .../service/MessageHandlerService.java | 20 +++++++++++++ .../asyncapi/service/PublisherService.java | 3 ++ template/src/main/resources/application.yml | 4 +-- 8 files changed, 68 insertions(+), 20 deletions(-) diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 9b600d616..22b018cbc 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -51,32 +51,28 @@ public ConnectionFactory connectionFactory() { @Bean public Declarables declarables() { {% for channelName, channel in asyncapi.channels() %} - Queue {{channelName}}Queue = new Queue({{channelName}}Queue); + Queue {{channelName}}_Queue = new Queue({{channelName}}Queue); {% endfor %} {% for channelName, channel in asyncapi.channels() %} - TopicExchange {{channelName}}Exchange = new TopicExchange({{channelName}}Exchange); + TopicExchange {{channelName}}_Exchange = new TopicExchange({{channelName}}Exchange); {% endfor %} {% for channelName, channel in asyncapi.channels() %} - Binding {{channelName}}Binding = BindingBuilder.bind({{channelName}}Queue) - .to({{channelName}}Exchange).with({{channelName}}RoutingKey); + Binding {{channelName}}_Binding = BindingBuilder.bind({{channelName}}_Queue) + .to({{channelName}}_Exchange).with({{channelName}}RoutingKey); {% endfor %} return new Declarables( - {% set i = 0 %} - {% for channelName, channel in asyncapi.channels() %} - {% if i == asyncapi.channels().size %} - {{channelName}}Queue, - {{channelName}}Exchange, - {{channelName}}Binding, + {% set i = 1 %}{% for channelName, channel in asyncapi.channels() %}{% if i == asyncapi.channels() | size %} + {{channelName}}_Queue, + {{channelName}}_Exchange, + {{channelName}}_Binding, {% else %} - {{channelName}}Queue, - {{channelName}}Exchange, - {{channelName}}Binding - {% set i = i+1 %} - {% endif %} - {% endfor %} + {{channelName}}_Queue, + {{channelName}}_Exchange, + {{channelName}}_Binding + {% set i = i+1 %} {% endif %} {% endfor %} ); } diff --git a/partials/AmqpListener.java b/partials/AmqpListener.java index e69de29bb..4b493858d 100644 --- a/partials/AmqpListener.java +++ b/partials/AmqpListener.java @@ -0,0 +1,27 @@ +{% macro amqpListener(asyncapi, params) %} + +import com.javatechie.rabbitmq.demo.model.LightMeasuredPayload; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- for message in channel.subscribe().messages() %} +import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor -%} + {% endif -%} + {% endfor %} + + +@Service +public class MessageHandlerService{ + + {% for channelName, channel in asyncapi.channels() %} + {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} + @RabbitListener(queues = "${amqp.broker.{{- channeName -}}.queue}") + public void consumeFrom{{schemaName}}({{schemaName}} {{channelName}}Payload ){ + LOGGER.info("Message received from "+ {{schemaName}} " : " + lightMeasuredPayload); + } + {% endfor %} +} + +{% endmacro %} diff --git a/partials/AmqpPublisher.java b/partials/AmqpPublisher.java index 3fb292911..37138c028 100644 --- a/partials/AmqpPublisher.java +++ b/partials/AmqpPublisher.java @@ -28,7 +28,7 @@ public class PublisherService { {% for channelName, channel in asyncapi.channels() %} {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} - public void update{{schemaName}}(){ + public void {{channel.subscribe().id() | camelCase}}(){ {{schemaName}} {{channelName}}Payload = new {{schemaName}}(); template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload); } diff --git a/template/build.gradle b/template/build.gradle index bd924d464..f317fe1f8 100644 --- a/template/build.gradle +++ b/template/build.gradle @@ -15,6 +15,7 @@ repositories { dependencies { {%- if asyncapi | isProtocol('amqp') %} implementation('org.springframework.integration:spring-integration-amqp') + implementation('org.springframework.integration:spring-integration-amqp') {% endif -%} {%- if asyncapi | isProtocol('mqtt') %} implementation('org.springframework.integration:spring-integration-mqtt') diff --git a/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java b/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java index 3d5837630..8416ea8ea 100644 --- a/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java +++ b/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java @@ -19,7 +19,8 @@ public void run(String... args) { {%- for channelName, channel in asyncapi.channels() %} {%- if channel.hasSubscribe() %} {%- for message in channel.subscribe().messages() %} - publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %}); + publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}() + {% elif asyncapi | isProtocol('amqp') %}{% else %}"Hello World from {{channelName}}"{% endif %}); {% endfor -%} {% endif -%} {%- endfor %} diff --git a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java index e6e6b5561..3c2dea1b6 100644 --- a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java +++ b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java @@ -27,6 +27,16 @@ {%- endif %} {%- endfor %} {% endif %} +{% if asyncapi | isProtocol('amqp') and hasPublish %} +import org.springframework.amqp.rabbit.annotation.RabbitListener; + {% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasPublish() %} + {%- for message in channel.publish().messages() %} +import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor %} + {%- endif %} + {%- endfor %} + {% endif %} @Service public class MessageHandlerService { @@ -52,6 +62,16 @@ public class MessageHandlerService { } {%- endif %} {% endfor %} + + {% elif asyncapi | isProtocol('amqp') %} + {% for channelName, channel in asyncapi.channels() %} + {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} + @RabbitListener(queues = "${amqp.{{- channelName -}}.queue}") + public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){ + LOGGER.info("Message received from {{- schemaName -}} : " + {{channelName}}Payload); + } + {% endfor %} + {% else %} {% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %} diff --git a/template/src/main/java/com/asyncapi/service/PublisherService.java b/template/src/main/java/com/asyncapi/service/PublisherService.java index 8041ab432..044148da5 100644 --- a/template/src/main/java/com/asyncapi/service/PublisherService.java +++ b/template/src/main/java/com/asyncapi/service/PublisherService.java @@ -1,8 +1,11 @@ package {{ params['userJavaPackage'] }}.service; {%- from "partials/CommonPublisher.java" import commonPublisher -%} {%- from "partials/KafkaPublisher.java" import kafkaPublisher -%} +{%- from "partials/AmqpPublisher.java" import amqpPublisher -%} {%- if asyncapi | isProtocol('kafka') -%} {{- kafkaPublisher(asyncapi, params) -}} +{%- elif asyncapi | isProtocol('amqp') -%} +{{- amqpPublisher(asyncapi, params) -}} {%- else -%} {{- commonPublisher(asyncapi) -}} {%- endif -%} \ No newline at end of file diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index 026a06174..53e840a74 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -20,8 +20,8 @@ amqp: {% for channelName, channel in asyncapi.channels() %} {{channelName}}: exchange: {{channel.publish().binding('amqp').exchange}} - queue: {{channel.publish().binding('amqp').queue}} - routingKey: {{channel.publish().binding('amqp').routingKey}} + queue: {{channel.subscribe().binding('amqp').queue}} + routingKey: {{channel.subscribe().binding('amqp').routingKey}} {% endfor %} {% endif %} From eeab14ecd773350833384d74740b642bc2d30e78 Mon Sep 17 00:00:00 2001 From: VaishnaviNandakumar Date: Tue, 13 Jun 2023 00:03:05 +0530 Subject: [PATCH 4/9] Updated AMQP config and added sample config files to test --- partials/AmqpConfig.java | 4 +- partials/AmqpListener.java | 27 ------- .../aqmp_config_multiple-channels.yaml | 76 +++++++++++++++++++ .../aqmp_config_single-channel.yaml | 60 +++++++++++++++ 4 files changed, 138 insertions(+), 29 deletions(-) delete mode 100644 partials/AmqpListener.java create mode 100644 tests/user_examples/aqmp_config_multiple-channels.yaml create mode 100644 tests/user_examples/aqmp_config_single-channel.yaml diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 22b018cbc..3d9022ebd 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -67,11 +67,11 @@ public Declarables declarables() { {% set i = 1 %}{% for channelName, channel in asyncapi.channels() %}{% if i == asyncapi.channels() | size %} {{channelName}}_Queue, {{channelName}}_Exchange, - {{channelName}}_Binding, + {{channelName}}_Binding {% else %} {{channelName}}_Queue, {{channelName}}_Exchange, - {{channelName}}_Binding + {{channelName}}_Binding, {% set i = i+1 %} {% endif %} {% endfor %} ); } diff --git a/partials/AmqpListener.java b/partials/AmqpListener.java deleted file mode 100644 index 4b493858d..000000000 --- a/partials/AmqpListener.java +++ /dev/null @@ -1,27 +0,0 @@ -{% macro amqpListener(asyncapi, params) %} - -import com.javatechie.rabbitmq.demo.model.LightMeasuredPayload; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Service; -{% for channelName, channel in asyncapi.channels() %} - {%- if channel.hasSubscribe() %} - {%- for message in channel.subscribe().messages() %} -import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; - {%- endfor -%} - {% endif -%} - {% endfor %} - - -@Service -public class MessageHandlerService{ - - {% for channelName, channel in asyncapi.channels() %} - {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} - @RabbitListener(queues = "${amqp.broker.{{- channeName -}}.queue}") - public void consumeFrom{{schemaName}}({{schemaName}} {{channelName}}Payload ){ - LOGGER.info("Message received from "+ {{schemaName}} " : " + lightMeasuredPayload); - } - {% endfor %} -} - -{% endmacro %} diff --git a/tests/user_examples/aqmp_config_multiple-channels.yaml b/tests/user_examples/aqmp_config_multiple-channels.yaml new file mode 100644 index 000000000..352b59d81 --- /dev/null +++ b/tests/user_examples/aqmp_config_multiple-channels.yaml @@ -0,0 +1,76 @@ +asyncapi: 2.5.0 +info: + title: Streetlights API Simplified + version: 1.0.0 + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + This is a simplified version of the Streetlights API from other examples. This version is used in AsyncAPI documentation. + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 +servers: + production: + url: localhost + protocol: amqp + description: RabbitMQ + variables: + port: + default: '5672' + username: + default: guest + +channels: + lightMeasured_Streetlight1: + publish: + summary: Inform about environmental lighting conditions for Streetlight 1. + operationId: readLightMeasurement_Streetlight1 + bindings: + amqp: + exchange: lightMeasuredExchange + message: + $ref: '#/components/messages/lightMeasured' + subscribe: + operationId: updateLightMeasurement_Streetlight1 + message: + $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + queue: lightMeasuredQueue_Streetlight1 + routingKey: lightMeasuredRoutingKey_Streetlight1 + lightMeasured_Streetlight2: + publish: + summary: Inform about environmental lighting conditions for Streetlight 2. + operationId: readLightMeasurement_Streetlight2 + bindings: + amqp: + exchange: lightMeasuredExchange + message: + $ref: '#/components/messages/lightMeasured' + subscribe: + operationId: updateLightMeasurement_Streetlight2 + message: + $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + queue: lightMeasuredQueue_Streetlight2 + routingKey: lightMeasuredRoutingKey_Streetlight2 + +components: + messages: + lightMeasured: + summary: Inform about environmental lighting conditions for a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + schemas: + lightMeasuredPayload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the streetlight. + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + diff --git a/tests/user_examples/aqmp_config_single-channel.yaml b/tests/user_examples/aqmp_config_single-channel.yaml new file mode 100644 index 000000000..ae598d324 --- /dev/null +++ b/tests/user_examples/aqmp_config_single-channel.yaml @@ -0,0 +1,60 @@ +asyncapi: 2.5.0 +info: + title: Streetlights API Simplified + version: 1.0.0 + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + This is a simplified version of the Streetlights API from other examples. This version is used in AsyncAPI documentation. + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 +servers: + production: + url: localhost + protocol: amqp + description: RabbitMQ + variables: + port: + default: '5672' + username: + default: guest + +channels: + lightMeasured: + publish: + summary: Inform about environmental lighting conditions for a particular streetlight. + operationId: readLightMeasurement + bindings: + amqp: + exchange: lightMeasuredExchange + message: + $ref: '#/components/messages/lightMeasured' + subscribe: + operationId: updateLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + queue: lightMeasuredQueue + routingKey: lightMeasuredRoutingKey + + +components: + messages: + lightMeasured: + summary: Inform about environmental lighting conditions for a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + schemas: + lightMeasuredPayload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the streetlight. + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + From 34cbafdab1ea58afa05de08b9e38dec6c3965449 Mon Sep 17 00:00:00 2001 From: VaishnaviNandakumar Date: Wed, 6 Sep 2023 00:30:20 +0530 Subject: [PATCH 5/9] Updated amqp config --- partials/AmqpConfig.java | 34 +++++++------------ partials/AmqpPublisher.java | 7 +++- .../service/MessageHandlerService.java | 8 +++-- template/src/main/resources/application.yml | 6 ++-- .../aqmp_config_multiple-channels.yaml | 21 ++++++++---- .../aqmp_config_single-channel.yaml | 17 ++++++---- 6 files changed, 50 insertions(+), 43 deletions(-) diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 3d9022ebd..b6b6034fa 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -29,14 +29,19 @@ public class Config { {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasSubscribe() %} @Value("${amqp.{{- channelName -}}.exchange}") private String {{channelName}}Exchange; + @Value("${amqp.{{- channelName -}}.routingKey}") + private String {{channelName}}RoutingKey; + {% endif %} + + {% if channel.hasPublish() %} @Value("${amqp.{{- channelName -}}.queue}") private String {{channelName}}Queue; + {% endif %} - @Value("${amqp.{{- channelName -}}.routingKey}") - private String {{channelName}}RoutingKey; {% endfor %} @Bean @@ -50,29 +55,14 @@ public ConnectionFactory connectionFactory() { @Bean public Declarables declarables() { + return new Declarables( {% for channelName, channel in asyncapi.channels() %} - Queue {{channelName}}_Queue = new Queue({{channelName}}Queue); - {% endfor %} - - {% for channelName, channel in asyncapi.channels() %} - TopicExchange {{channelName}}_Exchange = new TopicExchange({{channelName}}Exchange); - {% endfor %} + {% if channel.hasSubscribe() %} new TopicExchange({{channelName}}Exchange, true, false) + {% if not loop.last %},{% endif %}{% endif %}{% endfor %} {% for channelName, channel in asyncapi.channels() %} - Binding {{channelName}}_Binding = BindingBuilder.bind({{channelName}}_Queue) - .to({{channelName}}_Exchange).with({{channelName}}RoutingKey); - {% endfor %} - - return new Declarables( - {% set i = 1 %}{% for channelName, channel in asyncapi.channels() %}{% if i == asyncapi.channels() | size %} - {{channelName}}_Queue, - {{channelName}}_Exchange, - {{channelName}}_Binding - {% else %} - {{channelName}}_Queue, - {{channelName}}_Exchange, - {{channelName}}_Binding, - {% set i = i+1 %} {% endif %} {% endfor %} + {% if channel.hasPublish() %}{% if loop.first %},{% endif %} new Queue({{channelName}}Queue, true, false, false) + {% if not loop.last %},{% endif %}{% endif %} {% endfor %} ); } diff --git a/partials/AmqpPublisher.java b/partials/AmqpPublisher.java index 37138c028..411ce9735 100644 --- a/partials/AmqpPublisher.java +++ b/partials/AmqpPublisher.java @@ -19,20 +19,25 @@ public class PublisherService { private RabbitTemplate template; {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasSubscribe() %} @Value("${amqp.{{- channelName -}}.exchange}") private String {{channelName}}Exchange; - @Value("${amqp.{{- channelName -}}.routingKey}") private String {{channelName}}RoutingKey; + {% endif %} {% endfor %} {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasSubscribe() %} {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} public void {{channel.subscribe().id() | camelCase}}(){ {{schemaName}} {{channelName}}Payload = new {{schemaName}}(); template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload); } + + {% endif %} {% endfor %} + } {% endmacro %} \ No newline at end of file diff --git a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java index 3c2dea1b6..573329623 100644 --- a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java +++ b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java @@ -65,16 +65,18 @@ public class MessageHandlerService { {% elif asyncapi | isProtocol('amqp') %} {% for channelName, channel in asyncapi.channels() %} - {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} + {% if channel.hasPublish() %} + {%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %} @RabbitListener(queues = "${amqp.{{- channelName -}}.queue}") public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){ - LOGGER.info("Message received from {{- schemaName -}} : " + {{channelName}}Payload); + LOGGER.info("Message received from {{- channelName -}} : " + {{channelName}}Payload); } + {% endif %} {% endfor %} {% else %} {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasPublish() %} + {% if channel.hasPublish() %} {% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %} * {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %} * {{line | safe}}{% endfor %} diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index 53e840a74..b9575fcf1 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -19,9 +19,9 @@ amqp: password: {% for channelName, channel in asyncapi.channels() %} {{channelName}}: - exchange: {{channel.publish().binding('amqp').exchange}} - queue: {{channel.subscribe().binding('amqp').queue}} - routingKey: {{channel.subscribe().binding('amqp').routingKey}} + {% if channel.hasSubscribe() %} exchange: {{channel.subscribe().binding('amqp').exchange.name}} {% endif %} + {% if channel.hasSubscribe() %} routingKey: {{channel.subscribe().binding('amqp').routingKey}}{% endif %} + {% if channel.hasPublish() %} queue: {{channel.publish().binding('amqp').queue.name}}{% endif %} {% endfor %} {% endif %} diff --git a/tests/user_examples/aqmp_config_multiple-channels.yaml b/tests/user_examples/aqmp_config_multiple-channels.yaml index 352b59d81..4ee1ed9de 100644 --- a/tests/user_examples/aqmp_config_multiple-channels.yaml +++ b/tests/user_examples/aqmp_config_multiple-channels.yaml @@ -26,7 +26,9 @@ channels: operationId: readLightMeasurement_Streetlight1 bindings: amqp: - exchange: lightMeasuredExchange + is: queue + queue: + name: lightMeasurementQueue_Streetlight1 message: $ref: '#/components/messages/lightMeasured' subscribe: @@ -35,15 +37,19 @@ channels: $ref: '#/components/messages/lightMeasured' bindings: amqp: - queue: lightMeasuredQueue_Streetlight1 - routingKey: lightMeasuredRoutingKey_Streetlight1 + is: routingKey + exchange: + name: lightMeasurementExchange_Streetlight1 + routingKey: lightMeasurementRoutingKey_Streetlight1 lightMeasured_Streetlight2: publish: summary: Inform about environmental lighting conditions for Streetlight 2. operationId: readLightMeasurement_Streetlight2 bindings: amqp: - exchange: lightMeasuredExchange + is: queue + queue: + name: lightMeasurementQueue_Streetlight2 message: $ref: '#/components/messages/lightMeasured' subscribe: @@ -52,9 +58,10 @@ channels: $ref: '#/components/messages/lightMeasured' bindings: amqp: - queue: lightMeasuredQueue_Streetlight2 - routingKey: lightMeasuredRoutingKey_Streetlight2 - + is: routingKey + exchange: + name: lightMeasurementExchange_Streetlight2 + routingKey: lightMeasurementRoutingKey_Streetlight2 components: messages: lightMeasured: diff --git a/tests/user_examples/aqmp_config_single-channel.yaml b/tests/user_examples/aqmp_config_single-channel.yaml index ae598d324..b76d23df5 100644 --- a/tests/user_examples/aqmp_config_single-channel.yaml +++ b/tests/user_examples/aqmp_config_single-channel.yaml @@ -24,20 +24,23 @@ channels: publish: summary: Inform about environmental lighting conditions for a particular streetlight. operationId: readLightMeasurement - bindings: - amqp: - exchange: lightMeasuredExchange message: $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + is: queue + queue: + name: lightMeasurementQueue subscribe: operationId: updateLightMeasurement message: $ref: '#/components/messages/lightMeasured' bindings: amqp: - queue: lightMeasuredQueue - routingKey: lightMeasuredRoutingKey - + is: routingKey + exchange: + name: lightMeasurementExchange + routingKey: lightMeasurementRoutingKey components: messages: @@ -52,7 +55,7 @@ components: id: type: integer minimum: 0 - description: Id of the streetlight. + description: Id of the streetlight. lumens: type: integer minimum: 0 From d46f4cbb4f7fa0af222348044d145b83870b25e7 Mon Sep 17 00:00:00 2001 From: VaishnaviNandakumar Date: Wed, 6 Sep 2023 01:11:43 +0530 Subject: [PATCH 6/9] Segregated declarables for exchanges and queues --- partials/AmqpConfig.java | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index b6b6034fa..c020adbeb 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -54,18 +54,22 @@ public ConnectionFactory connectionFactory() { } @Bean - public Declarables declarables() { + public Declarables exchanges() { return new Declarables( - {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasSubscribe() %} new TopicExchange({{channelName}}Exchange, true, false) - {% if not loop.last %},{% endif %}{% endif %}{% endfor %} - - {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasPublish() %}{% if loop.first %},{% endif %} new Queue({{channelName}}Queue, true, false, false) - {% if not loop.last %},{% endif %}{% endif %} {% endfor %} - ); + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} + new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %} + {% endif %}{% endfor %} + ); } + @Bean + public Declarables queues() { + return new Declarables( + {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} + new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %} + {% endif %}{% endfor %} + ); + @Bean public MessageConverter converter() { From 03f775870596790c63c3e7401154d03dd344681f Mon Sep 17 00:00:00 2001 From: VaishnaviNandakumar Date: Thu, 7 Sep 2023 19:03:35 +0530 Subject: [PATCH 7/9] Updated test snapshots --- tests/__snapshots__/kafka.test.js.snap | 2 ++ tests/__snapshots__/mqtt.test.js.snap | 14 ++++++++------ tests/__snapshots__/oneOf.test.js.snap | 2 ++ tests/__snapshots__/parameters.test.js.snap | 2 ++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/__snapshots__/kafka.test.js.snap b/tests/__snapshots__/kafka.test.js.snap index 4cbdb4ff7..d9fd1bb5c 100644 --- a/tests/__snapshots__/kafka.test.js.snap +++ b/tests/__snapshots__/kafka.test.js.snap @@ -138,6 +138,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.LightMeasuredPayload; + @Service public class MessageHandlerService { @@ -154,6 +155,7 @@ public class MessageHandlerService { } + } " `; diff --git a/tests/__snapshots__/mqtt.test.js.snap b/tests/__snapshots__/mqtt.test.js.snap index 64d9c8ed8..17fbf5177 100644 --- a/tests/__snapshots__/mqtt.test.js.snap +++ b/tests/__snapshots__/mqtt.test.js.snap @@ -207,13 +207,14 @@ import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; + @Service public class MessageHandlerService { private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + /** * The topic on which measured values may be produced and consumed. */ @@ -223,11 +224,11 @@ public class MessageHandlerService { } - - - + + + } @@ -1058,13 +1059,14 @@ import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; + @Service public class MessageHandlerService { private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + /** * The topic on which measured values may be produced and consumed. */ @@ -1074,7 +1076,7 @@ public class MessageHandlerService { } - + } diff --git a/tests/__snapshots__/oneOf.test.js.snap b/tests/__snapshots__/oneOf.test.js.snap index f8738300d..36af94d59 100644 --- a/tests/__snapshots__/oneOf.test.js.snap +++ b/tests/__snapshots__/oneOf.test.js.snap @@ -16,6 +16,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.AnonymousSchema1; import com.asyncapi.model.AnonymousSchema7; + @Service public class MessageHandlerService { @@ -32,6 +33,7 @@ public class MessageHandlerService { } + } " `; diff --git a/tests/__snapshots__/parameters.test.js.snap b/tests/__snapshots__/parameters.test.js.snap index a48552fe9..97c19bc48 100644 --- a/tests/__snapshots__/parameters.test.js.snap +++ b/tests/__snapshots__/parameters.test.js.snap @@ -15,6 +15,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.LightMeasuredPayload; + @Service public class MessageHandlerService { @@ -31,6 +32,7 @@ public class MessageHandlerService { } + } " `; From 73e0e932880f500747cd201faf21553da146cd93 Mon Sep 17 00:00:00 2001 From: Semen Date: Sat, 7 Oct 2023 23:30:18 +0300 Subject: [PATCH 8/9] fix double dependency --- template/build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/template/build.gradle b/template/build.gradle index e0fd9abd5..8df60c13c 100644 --- a/template/build.gradle +++ b/template/build.gradle @@ -15,7 +15,6 @@ repositories { dependencies { {%- if asyncapi | isProtocol('amqp') %} implementation('org.springframework.integration:spring-integration-amqp') - implementation('org.springframework.integration:spring-integration-amqp') {% endif -%} {%- if asyncapi | isProtocol('mqtt') %} implementation('org.springframework.integration:spring-integration-mqtt') From ab81bbd18889575c62eeeabd9ee6f1379317f52e Mon Sep 17 00:00:00 2001 From: Semen Date: Sat, 7 Oct 2023 23:41:34 +0300 Subject: [PATCH 9/9] fix snapshots --- tests/__snapshots__/kafka.test.js.snap | 3 ++- tests/__snapshots__/mqtt.test.js.snap | 14 ++++++++------ tests/__snapshots__/oneOf.test.js.snap | 3 ++- tests/__snapshots__/parameters.test.js.snap | 5 +++-- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/__snapshots__/kafka.test.js.snap b/tests/__snapshots__/kafka.test.js.snap index 07b0ec068..b8e1a8642 100644 --- a/tests/__snapshots__/kafka.test.js.snap +++ b/tests/__snapshots__/kafka.test.js.snap @@ -142,6 +142,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.LightMeasuredPayload; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -161,7 +162,7 @@ public class MessageHandlerService { } - + } " `; diff --git a/tests/__snapshots__/mqtt.test.js.snap b/tests/__snapshots__/mqtt.test.js.snap index a3a6e8074..07f23a8b8 100644 --- a/tests/__snapshots__/mqtt.test.js.snap +++ b/tests/__snapshots__/mqtt.test.js.snap @@ -213,6 +213,7 @@ import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -222,7 +223,7 @@ public class MessageHandlerService { private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + /** * The topic on which measured values may be produced and consumed. */ @@ -232,11 +233,11 @@ public class MessageHandlerService { } - - - + + + } @@ -1087,6 +1088,7 @@ import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -1096,7 +1098,7 @@ public class MessageHandlerService { private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + /** * The topic on which measured values may be produced and consumed. */ @@ -1106,7 +1108,7 @@ public class MessageHandlerService { } - + } diff --git a/tests/__snapshots__/oneOf.test.js.snap b/tests/__snapshots__/oneOf.test.js.snap index e9096f8df..e5fc6efde 100644 --- a/tests/__snapshots__/oneOf.test.js.snap +++ b/tests/__snapshots__/oneOf.test.js.snap @@ -16,6 +16,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.AnonymousSchema1; import com.asyncapi.model.AnonymousSchema7; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -35,7 +36,7 @@ public class MessageHandlerService { } - + } " `; diff --git a/tests/__snapshots__/parameters.test.js.snap b/tests/__snapshots__/parameters.test.js.snap index df156768b..c9b001b22 100644 --- a/tests/__snapshots__/parameters.test.js.snap +++ b/tests/__snapshots__/parameters.test.js.snap @@ -75,7 +75,7 @@ exports[`integration tests for generated files under different template paramete 1.16.3 test - + jakarta.validation jakarta.validation-api @@ -149,6 +149,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.LightMeasuredPayload; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -168,7 +169,7 @@ public class MessageHandlerService { } - + } " `;