Skip to content

Commit

Permalink
:fix: add auto reconnect to service client
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese committed Apr 5, 2024
1 parent 81a090d commit 5ae4ffa
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.LoginMetric;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.PublishMetric;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.SubscribeMetric;
Expand Down Expand Up @@ -379,9 +380,14 @@ private AccountInfo getAdminAccountInfoNoCache() throws JsonProcessingException,
SecurityAction.getEntity.name(),
EntityType.account.name(),
SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_ACCOUNT));
EntityResponse accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest);
if (accountResponse != null) {
return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName());
EntityResponse accountResponse;
try {
accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest);
if (accountResponse != null) {
return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName());
}
} catch (JsonProcessingException | JMSException | InterruptedException e) {
logger.warn("Error getting scopeId for user admin", e);
}
throw new SecurityException("User not authorized! Cannot get Admin Account info!");
}
Expand All @@ -403,9 +409,13 @@ private KapuaId getScopeIdNoCache(String username) throws JsonProcessingExceptio
SecurityAction.getEntity.name(),
EntityType.user.name(),
username);
EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest);
if (userResponse != null && userResponse.getScopeId() != null) {
return KapuaEid.parseCompactId(userResponse.getScopeId());
try {
EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest);
if (userResponse != null && userResponse.getScopeId() != null) {
return KapuaEid.parseCompactId(userResponse.getScopeId());
}
} catch (JsonProcessingException | JMSException | InterruptedException e) {
logger.warn("Error getting scopeId for username {}", username, e);
}
throw new SecurityException("User not authorized! Cannot get scopeId for username:" + username);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.kapua.client.security;

import com.fasterxml.jackson.core.JsonProcessingException;

import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.bean.AuthRequest;
import org.eclipse.kapua.client.security.bean.AuthResponse;
Expand Down Expand Up @@ -44,7 +45,6 @@ public ServiceClientMessagingImpl(MessageListener messageListener, Client client

@Override
public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException)
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerConnect.name());
Expand All @@ -60,7 +60,6 @@ public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedExc

@Override
public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSException, InterruptedException, JsonProcessingException {
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerDisconnect.name());
Expand All @@ -76,7 +75,6 @@ public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSExceptio

@Override
public EntityResponse getEntity(EntityRequest entityRequest) throws JMSException, InterruptedException, JsonProcessingException {
client.checkAuthServiceConnection();
String requestId = MessageHelper.getNewRequestId();
entityRequest.setRequestId(requestId);
ResponseContainer<EntityResponse> responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, entityRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
Expand All @@ -32,36 +33,39 @@ public class Client {

private static Logger logger = LoggerFactory.getLogger(Client.class);

private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000;

private ConnectionFactory connectionFactory;//is this reference needed?
private Connection connection;//keep to implement cleanup (and object lifecycle)
private Session session;
private MessageConsumer consumer;
private MessageProducer producer;
private String clientId;
private String requestAddress;
private String replyAddress;
private ClientMessageListener clientMessageListener;
private ExceptionListener exceptionListener;

private ConnectionStatus connectionStatus;
private boolean active;
private boolean connectionStatus;

public Client(String username, String password, String host, int port, String clientId,
String requestAddress, String replyAddress, ClientMessageListener clientMessageListener) throws JMSException {
connectionStatus = new ConnectionStatus();
this.clientId = clientId;
this.requestAddress = requestAddress;
this.replyAddress = replyAddress;
this.clientMessageListener = clientMessageListener;
connectionFactory = new JmsConnectionFactory(username, password, "amqp://" + host + ":" + port);
connection = connectionFactory.createConnection();
connection.setExceptionListener(new JMSExceptionListner(connectionStatus, clientId));
connection.setClientID(clientId);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("AMQP client binding request sender to: {}", requestAddress);
logger.info("AMQP client binding message listener to: {}", replyAddress);
consumer = session.createConsumer(session.createQueue(replyAddress));
consumer.setMessageListener(clientMessageListener);
producer = session.createProducer(session.createQueue(requestAddress));
clientMessageListener.init(session, producer);
connectionStatus.setConnectionAlive();
}
exceptionListener = new ExceptionListener() {

public void checkAuthServiceConnection() {
if (!isConnected()) {
//TODO throw exception and deny operations
}
@Override
public void onException(JMSException exception) {
connectionStatus = false;
connect();
}
};
active = true;
connect();
}

public void sendMessage(TextMessage message) throws JMSException {
Expand All @@ -73,12 +77,61 @@ public TextMessage createTextMessage() throws JMSException {
return session.createTextMessage();
}

public boolean isConnected() {
public void stop() throws JMSException {
active = false;
disconnect();
}

private void disconnect() throws JMSException {
if (connection != null) {
connection.close();
}
}

private void connect() {
int connectAttempt = 0;
while (active && !isConnected()) {
logger.info("Connect attempt {}...", connectAttempt);
try {
logger.info("Service client {} - restarting attempt... {}", this, connectAttempt);
disconnect();
connection = connectionFactory.createConnection();
connection.setExceptionListener(exceptionListener);
connection.setClientID(clientId);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
logger.info("AMQP client binding request sender to: {}", requestAddress);
logger.info("AMQP client binding message listener to: {}", replyAddress);
consumer = session.createConsumer(session.createQueue(replyAddress));
consumer.setMessageListener(clientMessageListener);
producer = session.createProducer(session.createQueue(requestAddress));
clientMessageListener.init(session, producer);
connectionStatus = true;
logger.info("Service client {} - restarting attempt... {} DONE (Connection restored)", this, connectAttempt);
} catch (JMSException e) {
logger.info("Connect attempt {}... FAIL", connectAttempt, e);
//wait a bit
waitBeforeRetry();
}
connectAttempt++;
}
}

private boolean isConnected() {
try {
return connection!=null && connection.getClientID()!=null && connectionStatus.isAlive();
return connection!=null && connection.getClientID()!=null && connectionStatus;
} catch (JMSException e) {
logger.warn("Error while validating connection: {}", e.getMessage(), e);
return false;
}
}

private void waitBeforeRetry() {
try {
Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT);
} catch (InterruptedException e) {
logger.error("Wait for connect interrupted!", e);
}
}

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDrive

private static final Logger LOGGER = LoggerFactory.getLogger(JMSServiceEventBus.class);

private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000;

private final int producerPoolMinSize;
private final int producerPoolMaxSize;
private final int producerPoolBorrowWait;
Expand Down Expand Up @@ -156,9 +158,8 @@ public ServiceEventBus getEventBus() {
}

private void waitBeforeRetry() {
// wait a bit
try {
Thread.sleep(2000);// TODO move to configuration
Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT);
} catch (InterruptedException e) {
LOGGER.error("Wait for connect interrupted!", e);
}
Expand Down Expand Up @@ -318,7 +319,7 @@ synchronized void subscribe(Subscription subscription)
final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = jmsSession.createTopic(subscriptionStr);
for (int i = 0; i < consumerPoolSize; i++) {
MessageConsumer jmsConsumer = jmsSession.createSharedDurableConsumer(jmsTopic, subscription.getName());
MessageConsumer jmsConsumer = jmsSession.createSharedConsumer(jmsTopic, subscription.getName());
jmsConsumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
Expand Down Expand Up @@ -461,6 +462,7 @@ public void onException(JMSException e) {
break;
} catch (ServiceEventBusException | JMSException e1) {
LOGGER.error("EventBus Listener {} - Cannot start new event bus connection... try again...", this, e1);
// wait a bit
waitBeforeRetry();
}
i++;
Expand Down

0 comments on commit 5ae4ffa

Please sign in to comment.