diff --git a/nats/src/main/java/io/micronaut/nats/connect/External.java b/nats/src/main/java/io/micronaut/nats/connect/External.java new file mode 100644 index 00000000..475470a9 --- /dev/null +++ b/nats/src/main/java/io/micronaut/nats/connect/External.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.nats.connect; + +/** + * External. + * + * @author Joachim Grimm + * @since 4.1.0 + */ +public abstract class External { + + private String api; + private String deliver; + + /** + * build the external information from the given configuration. + * + * @return this + */ + public io.nats.client.api.External build() { + return io.nats.client.api.External.builder().api(api).deliver(deliver).build(); + } + + /** + * Api. + * @param api {@link String} + */ + public void setApi(String api) { + this.api = api; + } + + /** + * Deliver. + * @param deliver {@link String} + */ + public void setDeliver(String deliver) { + this.deliver = deliver; + } + + final String getApi() { + return api; + } + + final String getDeliver() { + return deliver; + } +} diff --git a/nats/src/main/java/io/micronaut/nats/connect/Mirror.java b/nats/src/main/java/io/micronaut/nats/connect/Mirror.java index 703548f5..5c58e129 100644 --- a/nats/src/main/java/io/micronaut/nats/connect/Mirror.java +++ b/nats/src/main/java/io/micronaut/nats/connect/Mirror.java @@ -23,7 +23,7 @@ * @author Joachim Grimm * @since 4.1.0 */ -public abstract class Mirror extends SourceBase { +public abstract class Mirror extends SourceBase { /** * build the mirror object from the given configuration. @@ -35,9 +35,12 @@ public io.nats.client.api.Mirror build() { .name(name) .filterSubject(filterSubject) .startSeq(startSeq) - .startTime(startTime) - .subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build) - .toList()); + .startTime(startTime); + if (subjectTransforms != null) { + builder = builder + .subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build) + .toList()); + } if (external != null) { builder = builder.external(external.build()); } diff --git a/nats/src/main/java/io/micronaut/nats/connect/NatsConnectionFactoryConfig.java b/nats/src/main/java/io/micronaut/nats/connect/NatsConnectionFactoryConfig.java index 58b4d1d4..56425f84 100644 --- a/nats/src/main/java/io/micronaut/nats/connect/NatsConnectionFactoryConfig.java +++ b/nats/src/main/java/io/micronaut/nats/connect/NatsConnectionFactoryConfig.java @@ -323,11 +323,14 @@ public void setCertificatePath(String certificatePath) { private SSLContext createTlsContext() throws IOException, GeneralSecurityException { SSLContext ctx = SSLContext.getInstance(DEFAULT_SSL_PROTOCOL); - TrustManagerFactory factory = TrustManagerFactory.getInstance(Optional.ofNullable(trustStoreType).orElse("SunX509")); + TrustManagerFactory factory = TrustManagerFactory.getInstance(Optional.ofNullable(trustStoreType) + .orElse("SunX509")); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); if (trustStorePath != null && !trustStorePath.isEmpty()) { try (BufferedInputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(trustStorePath)))) { - ks.load(in, Optional.ofNullable(trustStorePassword).map(String::toCharArray).orElse(new char[0])); + ks.load(in, Optional.ofNullable(trustStorePassword) + .map(String::toCharArray) + .orElse(new char[0])); } } else { ks.load(null); @@ -454,7 +457,7 @@ public static class StreamConfiguration { private Mirror mirror; - private List sources = new ArrayList<>(); + private List sources; private Republish republish; @@ -481,9 +484,12 @@ public io.nats.client.api.StreamConfiguration.Builder getBuilder() { */ public io.nats.client.api.StreamConfiguration toStreamConfiguration() { io.nats.client.api.StreamConfiguration.Builder streamBuilder = builder.name(name) - .subjects(subjects) - .sources(sources.stream().map(io.micronaut.nats.connect.Source::build).toList()); - + .subjects(subjects); + if (sources != null) { + streamBuilder = streamBuilder.sources(sources.stream() + .map(io.micronaut.nats.connect.Source::build) + .toList()); + } if (mirror != null) { streamBuilder = streamBuilder.mirror(mirror.build()); } @@ -558,6 +564,7 @@ public void setRepublish(Republish republish) { /** * Consumer Limits. + * * @param consumerLimits {@link ConsumerLimits} */ public void setConsumerLimits(ConsumerLimits consumerLimits) { @@ -611,7 +618,7 @@ public static class ConsumerLimits extends io.micronaut.nats.connect.ConsumerLim * @since 4.1.0 */ @ConfigurationProperties("mirror") - public static class Mirror extends io.micronaut.nats.connect.Mirror { + public static class Mirror extends io.micronaut.nats.connect.Mirror { /** * Subject transformations. @@ -621,7 +628,6 @@ public static class Mirror extends io.micronaut.nats.connect.Mirror { + public static class Source extends io.micronaut.nats.connect.Source { /** * Subject transformations. @@ -651,9 +657,8 @@ public static class Source extends io.micronaut.nats.connect.Source sources = new ArrayList<>(); + private List sources; @ConfigurationBuilder(prefixes = "", excludes = {"addSources", "addSource", "name", "sources", "build", "placement", "republish", "mirror"}) private io.nats.client.api.KeyValueConfiguration.Builder builder = io.nats.client.api.KeyValueConfiguration.builder(); @@ -711,8 +716,13 @@ public io.nats.client.api.KeyValueConfiguration.Builder getBuilder() { */ public io.nats.client.api.KeyValueConfiguration toKeyValueConfiguration() { io.nats.client.api.KeyValueConfiguration.Builder keyValueBuilder = builder - .name(name) - .sources(sources.stream().map(io.micronaut.nats.connect.Source::build).toList()); + .name(name); + if (sources != null) { + keyValueBuilder = keyValueBuilder + .sources(sources.stream() + .map(io.micronaut.nats.connect.Source::build) + .toList()); + } if (mirror != null) { keyValueBuilder = keyValueBuilder.mirror(mirror.build()); } @@ -788,7 +798,7 @@ public static class Republish extends io.micronaut.nats.connect.Republish { * @since 4.1.0 */ @ConfigurationProperties("mirror") - public static class Mirror extends io.micronaut.nats.connect.Mirror { + public static class Mirror extends io.micronaut.nats.connect.Mirror { /** * Subject transformations. @@ -798,7 +808,6 @@ public static class Mirror extends io.micronaut.nats.connect.Mirror extends SourceBase { +public abstract class Source extends SourceBase { /** * build the source object from the given configuration. @@ -35,9 +35,13 @@ public io.nats.client.api.Source build() { .name(name) .filterSubject(filterSubject) .startSeq(startSeq) - .startTime(startTime) - .subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build) - .toList()); + .startTime(startTime); + if (subjectTransforms != null) { + builder = builder + .subjectTransforms(subjectTransforms.stream().map(SubjectTransformBase::build) + .toList()); + } + if (external != null) { builder = builder.external(external.build()); } diff --git a/nats/src/main/java/io/micronaut/nats/connect/SourceBase.java b/nats/src/main/java/io/micronaut/nats/connect/SourceBase.java index ee986738..40fa28d5 100644 --- a/nats/src/main/java/io/micronaut/nats/connect/SourceBase.java +++ b/nats/src/main/java/io/micronaut/nats/connect/SourceBase.java @@ -15,25 +15,21 @@ */ package io.micronaut.nats.connect; -import io.micronaut.context.annotation.EachProperty; -import io.micronaut.core.annotation.NonNull; - import java.time.ZonedDateTime; -import java.util.ArrayList; import java.util.List; -abstract class SourceBase { +abstract class SourceBase { protected String name; protected long startSeq = 0; protected ZonedDateTime startTime; protected String filterSubject; protected E external; - protected List subjectTransforms = new ArrayList<>(); - + protected List subjectTransforms; /** * Name. + * * @param name {@link String} */ public void setName(String name) { @@ -42,6 +38,7 @@ public void setName(String name) { /** * Start sequence. + * * @param startSeq long */ public void setStartSeq(long startSeq) { @@ -50,6 +47,7 @@ public void setStartSeq(long startSeq) { /** * Start time. + * * @param startTime {@link ZonedDateTime} */ public void setStartTime(ZonedDateTime startTime) { @@ -58,6 +56,7 @@ public void setStartTime(ZonedDateTime startTime) { /** * Filter subject. + * * @param filterSubject {@link String} */ public void setFilterSubject(String filterSubject) { @@ -68,7 +67,7 @@ public void setFilterSubject(String filterSubject) { * Subject transformations. * @param subjectTransforms list */ - public void setSubjectTransforms(@NonNull List subjectTransforms) { + public void setSubjectTransforms(List subjectTransforms) { this.subjectTransforms = subjectTransforms; } @@ -80,40 +79,4 @@ public void setExternal(E external) { this.external = external; } - @EachProperty(value = "subject-transforms", list = true) - public static class SubjectTransform extends SubjectTransformBase { - } - - /** - * External. - * - * @author Joachim Grimm - * @since 4.1.0 - */ - public static class External { - - private String api; - private String deliver; - - public io.nats.client.api.External build() { - return io.nats.client.api.External.builder().api(api).deliver(deliver).build(); - } - - /** - * Api. - * @param api {@link String} - */ - public void setApi(String api) { - this.api = api; - } - - /** - * Deliver. - * @param deliver {@link String} - */ - public void setDeliver(String deliver) { - this.deliver = deliver; - } - } - } diff --git a/nats/src/test/groovy/io/micronaut/nats/jetstream/consumer/SourcesSpec.groovy b/nats/src/test/groovy/io/micronaut/nats/jetstream/consumer/SourcesSpec.groovy new file mode 100644 index 00000000..e85dbe4b --- /dev/null +++ b/nats/src/test/groovy/io/micronaut/nats/jetstream/consumer/SourcesSpec.groovy @@ -0,0 +1,107 @@ +/* + * Copyright 2017-2022 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.nats.jetstream.consumer + +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Requires +import io.micronaut.messaging.annotation.MessageBody +import io.micronaut.nats.annotation.Subject +import io.micronaut.nats.jetstream.AbstractJetstreamTest +import io.micronaut.nats.jetstream.PullConsumerRegistry +import io.micronaut.nats.jetstream.annotation.JetStreamClient +import io.micronaut.nats.jetstream.annotation.JetStreamListener +import io.micronaut.nats.jetstream.annotation.PushConsumer +import io.nats.client.JetStreamManagement +import io.nats.client.JetStreamSubscription +import io.nats.client.PullSubscribeOptions +import io.nats.client.api.AckPolicy +import io.nats.client.api.PublishAck +import spock.util.concurrent.PollingConditions + +import java.time.Duration + +class SourcesSpec extends AbstractJetstreamTest { + + void "build stream from other stream"() { + given: + ApplicationContext context = startContext([ + "nats.default.jetstream.streams.d-widgets.storage-type": "Memory", + "nats.default.jetstream.streams.d-widgets.sources[0].name" : "widgets", + "nats.default.jetstream.streams.d-widgets.sources[0].filter-subject" : "subject.three", + "nats.default.jetstream.streams.d-widgets.sources[0].external.api" : "test", + "nats.default.jetstream.streams.d-widgets.sources[0].subject-transforms[0].source" : "subject.*", + "nats.default.jetstream.streams.d-widgets.sources[0].subject-transforms[0].destination" : 'subject.test.$1', + ]) + MyProducer producer = context.getBean(MyProducer) + MyConsumer consumer = context.getBean(MyConsumer) + PullConsumerRegistry pullConsumerRegistry = context.getBean(PullConsumerRegistry) + JetStreamManagement jsm = context.getBean(JetStreamManagement) + PollingConditions conditions = new PollingConditions(timeout: 3) + PullSubscribeOptions configuration = PullSubscribeOptions.builder() + .stream("d-widgets") + .durable("test1") + .build() + JetStreamSubscription streamSubscription = pullConsumerRegistry.newPullConsumer("subject.test.three", configuration) + + when: + producer.one("subject.three","abc".bytes) + producer.one("subject.two","abc".bytes) + + then: + conditions.eventually { + def fetch = streamSubscription.fetch(1, Duration.ofSeconds(3l)) + fetch.forEach { it.ack() } + fetch.size() == 1 + + consumer.messages.size() == 2 + consumer.messagesDestination.size() == 1 + } + + + cleanup: + jsm.deleteStream("widgets") + jsm.deleteStream("d-widgets") + context.close() + } + + @Requires(property = "spec.name", value = "SourcesSpec") + @JetStreamClient + static interface MyProducer { + + PublishAck one(@Subject String subject, @MessageBody byte[] data) + + } + + @Requires(property = "spec.name", value = "SourcesSpec") + @JetStreamListener + static class MyConsumer { + + public static List messagesDestination = [] + + @PushConsumer(value = "d-widgets", subject = "subject.test.three", durable = "test", ackPolicy = AckPolicy.All) + void listenDestination(byte[] data) { + messagesDestination.add(data) + } + + public static List messages = [] + + @PushConsumer(value = "widgets", subject = "subject.>", durable = "test", ackPolicy = AckPolicy.All) + void listenOrigin(byte[] data) { + messages.add(data) + } + } + +}