Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] 6303 JMS JNDI destination support #6305

Merged
merged 3 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/mp/reactivemessaging/jms.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
///////////////////////////////////////////////////////////////////////////////

Copyright (c) 2020, 2022 Oracle and/or its affiliates.
Copyright (c) 2020, 2023 Oracle and/or its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,6 +68,7 @@ Expression can only access headers and properties, not the payload.
|`session-group-id` | When multiple channels share same `session-group-id`,
they share same JMS session and same JDBC connection as well.
|`jndi.jms-factory` | JNDI name of JMS factory.
|`jndi.destination` | JNDI destination identifier.
|`jndi.env-properties` | Environment properties used for creating initial context `java.naming.factory.initial`, `java.naming.provider.url` ...
|`producer.someproperty` | property with producer prefix is set to producer instance (for example WLS Unit-of-Order `WLMessageProducer.setUnitOfOrder("unit-1")` can be configured as `producer.unit-of-order=unit-1`)
|===
Expand Down
4 changes: 3 additions & 1 deletion docs/mp/reactivemessaging/weblogic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Connector name: `helidon-weblogic-jms`
|`credentials` | Weblogic initial context credential(password)
|`type` | Possible values are: `queue`, `topic`. Default value is: `topic`
|`destination` | Queue or topic name in WebLogic CDI Syntax(CDI stands for Create Destination Identifier)
|`jndi.destination` | JNDI destination identifier. When no such JNDI destination is found, falls back to `destination` with CDI syntax.
|`acknowledge-mode` |Possible values are: `AUTO_ACKNOWLEDGE`- session automatically acknowledges a client’s receipt of a message,
`CLIENT_ACKNOWLEDGE` - receipt of a message is acknowledged only when `Message.ack()` is called manually,
`DUPS_OK_ACKNOWLEDGE` - session lazily acknowledges the delivery of messages. Default value: `AUTO_ACKNOWLEDGE`
Expand Down Expand Up @@ -108,7 +109,8 @@ mp:
outgoing:
to-wls:
connector: helidon-weblogic-jms
destination: ./TestJMSModule!TestQueue
# JNDI identifier for the same queue
jndi.destination: jms/TestQueue
----

When configuring destination with WebLogic CDI, the following syntax needs to be applied:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package io.helidon.examples.messaging.mp;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.SubmissionPublisher;

import io.helidon.common.reactive.Multi;
Expand Down Expand Up @@ -77,22 +79,26 @@ public ProcessorBuilder<String, Message<String>> multiply() {
* Broadcasts an event.
*
* @param msg Message to broadcast
* @return completed stage
*/
@Incoming("fromJms")
public void broadcast(JmsMessage<String> msg) {
public CompletionStage<Void> broadcast(JmsMessage<String> msg) {
// Broadcast to all subscribers
broadCaster.submit(msg.getPayload());
return CompletableFuture.completedFuture(null);
}

/**
* Same JMS session, different connector.
*
* @param msg Message to broadcast
* @return completed stage
*/
@Incoming("fromJmsSameSession")
public void sameSession(JmsMessage<String> msg) {
public CompletionStage<Void> sameSession(JmsMessage<String> msg) {
// Broadcast to all subscribers
broadCaster.submit(msg.getPayload());
return CompletableFuture.completedFuture(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Oracle and/or its affiliates.
# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,5 +40,5 @@ mp:
outgoing:
to-wls:
connector: helidon-weblogic-jms
# Same queue is used for simplifying test case
destination: ./TestJMSModule!TestQueue
# JNDI identifier for the same queue
jndi.destination: jms/TestQueue
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Optional<? extends ConnectionFactory> lookupFactory(String jndi) {
}

Optional<? extends Destination> lookupDestination(String jndi) {
return Optional.ofNullable((Destination) lookup(jndi))
return Optional.ofNullable(lookup(jndi))
.map(o -> JakartaJms.resolve(o, Destination.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.helidon.messaging.MessagingException;
import io.helidon.messaging.NackHandler;
import io.helidon.messaging.Stoppable;
import io.helidon.messaging.connectors.jms.shim.JakartaJms;
import io.helidon.messaging.connectors.jms.shim.JakartaWrapper;

import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -295,13 +296,16 @@ public class JmsConnector implements IncomingConnectorFactory, OutgoingConnector
static final String SCHEDULER_THREAD_NAME_PREFIX = "jms-poll-";
static final String EXECUTOR_THREAD_NAME_PREFIX = "jms-";

private final Instance<ConnectionFactory> connectionFactories;
private final Instance<ConnectionFactory> jakartaConnectionFactories;

private final ScheduledExecutorService scheduler;
private final ExecutorService executor;
private final Map<String, SessionMetadata> sessionRegister = new HashMap<>();
private final Map<String, ConnectionFactory> connectionFactoryMap;

@Inject
private Instance<javax.jms.ConnectionFactory> javaxConnectionFactories;

/**
* Provides a {@link JmsConnectorBuilder} for creating
* a {@link io.helidon.messaging.connectors.jms.JmsConnector} instance.
Expand Down Expand Up @@ -333,12 +337,13 @@ public static JmsConfigBuilder configBuilder() {
/**
* Create new JmsConnector.
*
* @param connectionFactories connection factory beans
* @param jakartaConnectionFactories connection factory beans
* @param config root config for thread context
*/
@Inject
protected JmsConnector(io.helidon.config.Config config, Instance<ConnectionFactory> connectionFactories) {
this.connectionFactories = connectionFactories;
protected JmsConnector(io.helidon.config.Config config,
Instance<ConnectionFactory> jakartaConnectionFactories) {
this.jakartaConnectionFactories = jakartaConnectionFactories;
this.connectionFactoryMap = Map.of();
scheduler = ScheduledThreadPoolSupplier.builder()
.threadNamePrefix(SCHEDULER_THREAD_NAME_PREFIX)
Expand All @@ -362,7 +367,8 @@ protected JmsConnector(io.helidon.config.Config config, Instance<ConnectionFacto
protected JmsConnector(Map<String, ConnectionFactory> connectionFactoryMap,
ScheduledExecutorService scheduler,
ExecutorService executor) {
this.connectionFactories = null;
this.jakartaConnectionFactories = null;
this.javaxConnectionFactories = null;
this.connectionFactoryMap = connectionFactoryMap;
this.scheduler = scheduler;
this.executor = executor;
Expand Down Expand Up @@ -452,20 +458,20 @@ protected Optional<? extends ConnectionFactory> getFactory(ConnectionContext ctx
if (factoryName.isPresent()) {
// Check SE map and MP instance for named factories
return Optional.ofNullable(connectionFactoryMap.get(factoryName.get()))
.or(() ->
Optional.ofNullable(connectionFactories)
.flatMap(s -> s.select(NamedLiteral.of(factoryName.get()))
.stream()
.findFirst()
)
);
.or(() -> getConnectionFactoryBean(factoryName.get()));
}

// Check SE map and MP instance for any factories
return connectionFactoryMap.values().stream().findFirst()
.or(() -> Optional.ofNullable(connectionFactories)
.flatMap(s -> s.stream().findFirst())
);
.or(() -> getConnectionFactoryBean(factoryName.get()));
}

private <T> Optional<ConnectionFactory> getConnectionFactoryBean(String name){
NamedLiteral literal = NamedLiteral.of(name);
return jakartaConnectionFactories.select(literal)
.stream()
.findFirst()
.or(() -> javaxConnectionFactories.select(literal).stream().map(JakartaJms::create).findFirst());
}

@Override
Expand Down Expand Up @@ -712,7 +718,6 @@ protected Destination createDestination(Session session, ConnectionContext ctx)

if (ctx.isJndi()) {
Optional<? extends Destination> jndiDestination = ctx.lookupDestination();
// JNDI can be used for looking up ConnectorFactory only
if (jndiDestination.isPresent()) {
return jndiDestination.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pattern=weblogic.**;java.util.**;java.lang.**;java.io.**;java.rmi.**
10 changes: 10 additions & 0 deletions tests/integration/jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,15 @@
<artifactId>helidon-microprofile-tests-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.messaging.mock</groupId>
<artifactId>helidon-messaging-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,6 @@
public class AbstractJmsTest {

static final String BROKER_URL = "vm://localhost?broker.persistent=false";
// static final String BROKER_URL = "tcp://localhost:61616";
static Session session;
static ConnectionFactory connectionFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,7 +98,7 @@ public static class ChannelAck extends AbstractSampleBean {

@Incoming("test-channel-ack-1")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<String> channelAck(Message<String> msg) {
public CompletionStage<Void> channelAck(Message<String> msg) {
LOGGER.fine(() -> String.format("Received %s", msg.getPayload()));
consumed().add(msg.getPayload());
if (msg.getPayload().startsWith("NO_ACK")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,6 @@
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.inject.se.SeContainer;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -54,16 +53,14 @@
@AddConfig(key = "mp.messaging.connector.helidon-jms.jndi.env-properties.java.naming.factory.initial",
value = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"),

@AddConfig(key = "mp.messaging.connector.helidon-jms.period-executions", value = "5"),

@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.connector", value = JmsConnector.CONNECTOR_NAME),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.acknowledge-mode", value = "CLIENT_ACKNOWLEDGE"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.type", value = "queue"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.destination", value = AckMpTest.TEST_QUEUE_ACK),
})
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Disabled("3.0.0-JAKARTA")
//java.lang.ClassCastException: class org.apache.activemq.ActiveMQConnectionFactory cannot be cast to class jakarta.jms
// .ConnectionFactory (org.apache.activemq.ActiveMQConnectionFactory and jakarta.jms.ConnectionFactory are in unnamed module of
// loader 'app')
public class AckMpTest extends AbstractMPTest {

static final String TEST_QUEUE_ACK = "queue-ack";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.messaging.connectors.jms;

import java.time.Duration;
import java.util.stream.IntStream;

import io.helidon.common.reactive.Multi;
import io.helidon.messaging.connectors.jms.shim.JakartaJms;
import io.helidon.messaging.connectors.mock.MockConnector;
import io.helidon.messaging.connectors.mock.TestConnector;
import io.helidon.microprofile.config.ConfigCdiExtension;
import io.helidon.microprofile.messaging.MessagingCdiExtension;
import io.helidon.microprofile.tests.junit5.AddBean;
import io.helidon.microprofile.tests.junit5.AddConfig;
import io.helidon.microprofile.tests.junit5.AddExtension;
import io.helidon.microprofile.tests.junit5.DisableDiscovery;
import io.helidon.microprofile.tests.junit5.HelidonTest;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.junit.jupiter.api.Test;

import static io.helidon.messaging.connectors.jms.JmsConnector.CONNECTOR_NAME;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.CONNECTOR_PREFIX;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.OUTGOING_PREFIX;

@HelidonTest
@DisableDiscovery
@AddBean(JmsConnector.class)
@AddBean(MockConnector.class)
@AddExtension(ConfigCdiExtension.class)
@AddExtension(MessagingCdiExtension.class)
@AddConfig(key = CONNECTOR_PREFIX + CONNECTOR_NAME + ".period-executions", value = "5")
@AddConfig(key = CONNECTOR_PREFIX + CONNECTOR_NAME + ".destination", value = "TestQueue1")

@AddConfig(key = OUTGOING_PREFIX + "to-jms.connector", value = CONNECTOR_NAME)
@AddConfig(key = OUTGOING_PREFIX + "to-jms.named-factory", value = "activemq-cf-jakarta")
@AddConfig(key = INCOMING_PREFIX + "from-jms.connector", value = CONNECTOR_NAME)
@AddConfig(key = INCOMING_PREFIX + "from-jms.named-factory", value = "activemq-cf-jakarta")
@AddConfig(key = OUTGOING_PREFIX + "to-mock.connector", value = MockConnector.CONNECTOR_NAME)
public class JmsInjectedFactoryTest {

static final Duration TIME_OUT = Duration.ofSeconds(15);
static final Integer[] TEST_DATA = IntStream.range(0, 10).boxed().toArray(Integer[]::new);

@Inject
@TestConnector
private MockConnector mockConnector;

@Produces
@ApplicationScoped
@Named("activemq-cf-jakarta")
public jakarta.jms.ConnectionFactory connectionFactory() {
return JakartaJms.create(new ActiveMQConnectionFactory(AbstractJmsTest.BROKER_URL));
}

@Outgoing("to-jms")
public Multi<String> produceData() {
return Multi.just(TEST_DATA)
.map(String::valueOf);
}

@Incoming("from-jms")
@Outgoing("to-mock")
public ProcessorBuilder<String, Integer> resendToMock() {
return ReactiveStreams.<String>builder()
.map(Integer::parseInt);
}

@Test
void jmsInOutTest() {
mockConnector.outgoing("to-mock", Integer.TYPE)
.awaitPayloads(TIME_OUT, TEST_DATA);
}
}
Loading