Skip to content

Commit

Permalink
v2.3.6
Browse files Browse the repository at this point in the history
supports multiple event stream clusters
  • Loading branch information
acn-ericlaw committed Jun 15, 2022
1 parent 6e46cb0 commit e46913e
Show file tree
Hide file tree
Showing 64 changed files with 697 additions and 568 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

---
## Version 2.3.6, 6/15/2022

### Added

Support more than one event stream cluster. User application can share the same event stream cluster
for pub/sub or connect to an alternative cluster for pub/sub use cases.

### Removed

N/A

### Changed

Cloud connector libraries update to Activemq Artemis version 2.23.0 and Hazelcast 5.1.2

---
## Version 2.3.5, 5/30/2022

Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ You can compile the rest-example as a microservices executable like this:
cd mercury/examples
cd rest-example
mvn clean package
java -jar target/rest-example-2.3.5.jar
java -jar target/rest-example-2.3.6.jar
# this will run the rest-example without a cloud connector
```

Expand Down Expand Up @@ -385,7 +385,7 @@ For development and testing, you can start a standalone Kafka server.
# start a terminal and go to the mercury sandbox folder, then go to the kafka-standalone folder
cd mercury/connectors/kafka/kafka-standalone/
mvn clean package
java -jar target/kafka-standalone-2.3.5.jar
java -jar target/kafka-standalone-2.3.6.jar
# this will run a standalone Kafka server in the foreground
```

Expand All @@ -395,7 +395,7 @@ The next step is to start the "presence-monitor" application.
# start another terminal and go to kafka-presence folder
cd mercury/connectors/kafka/kafka-presence/
mvn clean package
java -jar target/kafka-presence-2.3.5.jar
java -jar target/kafka-presence-2.3.6.jar
# this will run the presence monitor at port 8080 in the foreground

# when an application instance is started, it connects to the presence monitor to get topic.
Expand All @@ -411,19 +411,19 @@ Optionally, if you want to test resilience of the presence monitor, you can star
# start another terminal and go to kafka-presence folder
cd mercury/connectors/kafka/kafka-presence/
mvn clean package
java -Dserver.port=8081 -jar target/kafka-presence-2.3.5.jar
java -Dserver.port=8081 -jar target/kafka-presence-2.3.6.jar
# this will run the presence monitor at port 8081 in the foreground
```

- You can then run the lambda-example and rest-example applications

```bash
# go to the lambda-example project folder in one terminal
java -Dcloud.connector=kafka -jar target/lambda-example-2.3.5.jar
java -Dcloud.connector=kafka -jar target/lambda-example-2.3.6.jar
# the lambda-example will connect to the "presence monitor", obtain a topic and connect to Kafka

# go to the rest-example project folder in another terminal
java -Dcloud.connector=kafka -jar target/rest-example-2.3.5.jar
java -Dcloud.connector=kafka -jar target/rest-example-2.3.6.jar
# the rest-example will also connect to the "presence monitor", obtain a topic and connect to Kafka

# the lambda-example and rest-example apps will show the topic assignment like this
Expand Down
6 changes: 3 additions & 3 deletions connectors/adapters/activemq/activemq-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<artifactId>activemq-connector</artifactId>

<packaging>jar</packaging>
<version>2.3.5</version>
<version>2.3.6</version>
<name>Cloud connector for ActiveMQ cluster</name>

<parent>
Expand All @@ -19,7 +19,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<artemis.version>2.17.0</artemis.version>
<artemis.version>2.23.0</artemis.version>
<gson.version>2.8.9</gson.version>
<java.version>1.8</java.version>
</properties>
Expand All @@ -39,7 +39,7 @@
<dependency>
<groupId>org.platformlambda</groupId>
<artifactId>cloud-connector</artifactId>
<version>2.3.5</version>
<version>2.3.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.activemq/artemis-jms-client -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,68 @@
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@CloudConnector(name="activemq")
public class ArtemisConnector implements CloudSetup {
private static final Logger log = LoggerFactory.getLogger(ArtemisConnector.class);

private static final String SYSTEM = "system";
private static final String CLOUD_CLIENT_PROPERTIES = "cloud.client.properties";
public static final String BROKER_URL = "bootstrap.servers";
private static final String CLOUD_CONNECTOR_HEALTH = "cloud.connector.health";
private static final String USER_ID = "user.id";
private static final String USER_PWD = "user.password";
private static Properties properties;
private static Connection connection;

public static Properties getClusterProperties() {
private static final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Properties> allProperties = new ConcurrentHashMap<>();

public static Properties getClusterProperties(String location) {
// default location is cloud.client.properties
Properties properties = allProperties.get(location);
if (properties == null) {
ConfigReader clusterConfig = null;
try {
clusterConfig = ConnectorConfig.getConfig(location,
"file:/tmp/config/activemq.properties,classpath:/activemq.properties");
} catch (IOException e) {
log.error("Unable to find activemq properties - {}", e.getMessage());
System.exit(-1);
}
properties = new Properties();
for (String k : clusterConfig.getMap().keySet()) {
properties.setProperty(k, clusterConfig.getProperty(k));
}
String url = properties.getProperty(BROKER_URL);
Utility util = Utility.getInstance();
List<String> cluster = util.split(url, ", ");
boolean reachable = false;
for (String address : cluster) {
int start = address.lastIndexOf('/');
int colon = address.lastIndexOf(':');
if (colon > 1 && colon > start) {
String host = address.substring(start+1, colon);
int port = util.str2int(address.substring(colon + 1));
if (port > 0) {
// ping the address to confirm it is reachable before making a client connection
if (util.portReady(host, port, 10000)) {
reachable = true;
}
}
}
}
if (!reachable) {
log.error("ActiveMQ cluster {} is not reachable", cluster);
System.exit(-1);
}
allProperties.put(location, properties);
}
return properties;
}

public static synchronized Connection getConnection() throws JMSException {
public static synchronized Connection getConnection(String domain, Properties properties) throws JMSException {
Connection connection = allConnections.get(domain);
if (connection == null) {
String cluster = properties.getProperty(BROKER_URL, "tcp://127.0.0.1:61616");
String userId = properties.getProperty(USER_ID, "");
Expand All @@ -70,25 +115,29 @@ public static synchronized Connection getConnection() throws JMSException {
connection = factory.createConnection(userId, password);
connection.setExceptionListener((e) -> {
String error = e.getMessage();
log.error("Tibco cluster exception - {}", error);
log.error("Activemq cluster exception - {}", error);
if (error != null && (error.contains("terminated") || error.contains("disconnect"))) {
ArtemisConnector.stopConnection();
ArtemisConnector.stopConnection(domain);
System.exit(10);
}
});
connection.start();
log.info("Connection started - {}", cluster);
ConnectorConfig.setServiceName("activemq-artemis");
ConnectorConfig.setDisplayUrl(cluster);
if (SYSTEM.equals(domain)) {
ConnectorConfig.setServiceName("activemq-artemis");
ConnectorConfig.setDisplayUrl(cluster);
}
allConnections.put(domain, connection);
}
return connection;
}

public static synchronized void stopConnection() {
public static synchronized void stopConnection(String domain) {
Connection connection = allConnections.get(domain);
if (connection != null) {
try {
allConnections.remove(domain);
connection.stop();
connection = null;
log.info("Connection stopped");
} catch (JMSException e) {
// ok to ignore
Expand All @@ -98,46 +147,11 @@ public static synchronized void stopConnection() {

@Override
public void initialize() {
Utility util = Utility.getInstance();
ConfigReader clusterConfig = null;
try {
clusterConfig = ConnectorConfig.getConfig("cloud.client.properties",
"file:/tmp/config/activemq.properties,classpath:/activemq.properties");
} catch (IOException e) {
log.error("Unable to find activemq.properties - {}", e.getMessage());
System.exit(-1);
}
properties = new Properties();
for (String k : clusterConfig.getMap().keySet()) {
properties.setProperty(k, clusterConfig.getProperty(k));
}
String url = properties.getProperty(BROKER_URL);
ConnectorConfig.setServiceName("activemq-artemis");
ConnectorConfig.setDisplayUrl(url);
List<String> cluster = util.split(url, ", ");
boolean reachable = false;
for (String address : cluster) {
int start = address.lastIndexOf('/');
int colon = address.lastIndexOf(':');
if (colon > 1 && colon > start) {
String host = address.substring(start+1, colon);
int port = util.str2int(address.substring(colon + 1));
if (port > 0) {
// ping the address to confirm it is reachable before making a client connection
if (util.portReady(host, port, 10000)) {
reachable = true;
}
}
}
}
if (!reachable) {
log.error("ActiveMQ cluster {} is not reachable", cluster);
System.exit(-1);
}
try {
Platform platform = Platform.getInstance();
PubSub ps = PubSub.getInstance();
ps.enableFeature(new PubSubManager());
PubSub ps = PubSub.getInstance(SYSTEM);
Properties properties = getClusterProperties(CLOUD_CLIENT_PROPERTIES);
ps.enableFeature(new PubSubManager(SYSTEM, properties, ServiceRegistry.CLOUD_MANAGER));
AppConfigReader config = AppConfigReader.getInstance();
if (!"true".equals(config.getProperty("service.monitor", "false"))) {
// start presence connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.platformlambda.core.annotations.CloudService;
import org.platformlambda.core.models.CloudSetup;
import org.platformlambda.core.system.PubSub;
import org.platformlambda.core.util.AppConfigReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import java.io.IOException;
import java.util.Properties;

/**
* This cloud service provides pub/sub service.
Expand All @@ -41,11 +43,24 @@
public class PubSubSetup implements CloudSetup {
private static final Logger log = LoggerFactory.getLogger(PubSubSetup.class);

private static final String USER = "user";
private static final String USER_CLOUD_CLIENT_PROPERTIES = "user.clouod.client.properties";
private static final String CUSTOM_CLOUD_MANAGER_CONFIG = "user.cloud.manager";
private static final String CUSTOM_CLOUD_MANAGER_ROUTE = "user.cloud.manager";

@Override
public void initialize() {
if (!PubSub.getInstance().featureEnabled()) {
if (!PubSub.getInstance(USER).featureEnabled()) {
try {
PubSub.getInstance().enableFeature(new PubSubManager());
AppConfigReader config = AppConfigReader.getInstance();
String cloudManager = config.getProperty(CUSTOM_CLOUD_MANAGER_CONFIG);
if (cloudManager == null) {
log.warn("Config parameter {} not defined - using default cloud manager: {}",
CUSTOM_CLOUD_MANAGER_CONFIG, CUSTOM_CLOUD_MANAGER_ROUTE);
cloudManager = CUSTOM_CLOUD_MANAGER_ROUTE;
}
Properties properties = ArtemisConnector.getClusterProperties(USER_CLOUD_CLIENT_PROPERTIES);
PubSub.getInstance(USER).enableFeature(new PubSubManager(USER, properties, cloudManager));
log.info("Started");
} catch (JMSException | IOException e) {
log.error("Unable to connect to ActiveMQ cluster", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@

import javax.jms.*;
import java.io.IOException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.*;

public class EventConsumer {
private static final Logger log = LoggerFactory.getLogger(EventConsumer.class);
Expand All @@ -59,8 +56,13 @@ public class EventConsumer {
private long offset = -1;
private Session session;
private MessageConsumer messageConsumer;
private final String domain;
private final Properties properties;

public EventConsumer(String topic, int partition, String... parameters) throws IOException {
public EventConsumer(String domain, Properties properties, String topic, int partition, String... parameters)
throws IOException {
this.domain = domain;
this.properties = properties;
boolean substitute = ConnectorConfig.topicSubstitutionEnabled();
Map<String, String> preAllocatedTopics = ConnectorConfig.getTopicSubstitution();
this.topic = topic;
Expand Down Expand Up @@ -92,7 +94,7 @@ public void start() {
try {
PostOffice po = PostOffice.getInstance();
Platform platform = Platform.getInstance();
Connection connection = ArtemisConnector.getConnection();
Connection connection = ArtemisConnector.getConnection(domain, properties);
session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
if (partition == -2) {
messageConsumer = session.createConsumer(session.createQueue(realTopic));
Expand Down Expand Up @@ -225,7 +227,7 @@ public void onMessage(Message evt) {
// transport the headers and payload in original form
message.setBody(data).setHeaders(originalHeaders);
try {
// mercury service name must be lower case
// mercury service name must be lowercase
po.send(message.setTo(virtualTopic.toLowerCase()));

} catch (Exception e) {
Expand Down
Loading

0 comments on commit e46913e

Please sign in to comment.