From 5ae4ffa63e9f2877d2057a373882d5ab1a2f715f Mon Sep 17 00:00:00 2001 From: riccardomodanese Date: Fri, 5 Apr 2024 15:24:24 +0200 Subject: [PATCH] :fix: add auto reconnect to service client Signed-off-by: riccardomodanese --- .../plugin/security/SecurityPlugin.java | 22 +++-- .../security/ServiceClientMessagingImpl.java | 4 +- .../client/security/amqpclient/Client.java | 95 +++++++++++++++---- .../security/amqpclient/ConnectionStatus.java | 30 ------ .../amqpclient/JMSExceptionListner.java | 38 -------- .../commons/event/jms/JMSServiceEventBus.java | 8 +- 6 files changed, 96 insertions(+), 101 deletions(-) delete mode 100644 client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ConnectionStatus.java delete mode 100644 client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/JMSExceptionListner.java diff --git a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java index cc508b2931a..f1b960dbaec 100644 --- a/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java +++ b/broker/artemis/plugin/src/main/java/org/eclipse/kapua/broker/artemis/plugin/security/SecurityPlugin.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5; +import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.broker.artemis.plugin.security.metric.LoginMetric; import org.eclipse.kapua.broker.artemis.plugin.security.metric.PublishMetric; import org.eclipse.kapua.broker.artemis.plugin.security.metric.SubscribeMetric; @@ -379,9 +380,14 @@ private AccountInfo getAdminAccountInfoNoCache() throws JsonProcessingException, SecurityAction.getEntity.name(), EntityType.account.name(), SystemSetting.getInstance().getString(SystemSettingKey.SYS_ADMIN_ACCOUNT)); - EntityResponse accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest); - if (accountResponse != null) { - return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName()); + EntityResponse accountResponse; + try { + accountResponse = serverContext.getAuthServiceClient().getEntity(accountRequest); + if (accountResponse != null) { + return new AccountInfo(KapuaEid.parseCompactId(accountResponse.getId()), accountResponse.getName()); + } + } catch (JsonProcessingException | JMSException | InterruptedException e) { + logger.warn("Error getting scopeId for user admin", e); } throw new SecurityException("User not authorized! Cannot get Admin Account info!"); } @@ -403,9 +409,13 @@ private KapuaId getScopeIdNoCache(String username) throws JsonProcessingExceptio SecurityAction.getEntity.name(), EntityType.user.name(), username); - EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest); - if (userResponse != null && userResponse.getScopeId() != null) { - return KapuaEid.parseCompactId(userResponse.getScopeId()); + try { + EntityResponse userResponse = serverContext.getAuthServiceClient().getEntity(userRequest); + if (userResponse != null && userResponse.getScopeId() != null) { + return KapuaEid.parseCompactId(userResponse.getScopeId()); + } + } catch (JsonProcessingException | JMSException | InterruptedException e) { + logger.warn("Error getting scopeId for username {}", username, e); } throw new SecurityException("User not authorized! Cannot get scopeId for username:" + username); } diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java b/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java index 37d842d4d49..4929351bb97 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java @@ -13,6 +13,7 @@ package org.eclipse.kapua.client.security; import com.fasterxml.jackson.core.JsonProcessingException; + import org.eclipse.kapua.client.security.amqpclient.Client; import org.eclipse.kapua.client.security.bean.AuthRequest; import org.eclipse.kapua.client.security.bean.AuthResponse; @@ -44,7 +45,6 @@ public ServiceClientMessagingImpl(MessageListener messageListener, Client client @Override public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException) - client.checkAuthServiceConnection(); String requestId = MessageHelper.getNewRequestId(); authRequest.setRequestId(requestId); authRequest.setAction(SecurityAction.brokerConnect.name()); @@ -60,7 +60,6 @@ public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedExc @Override public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSException, InterruptedException, JsonProcessingException { - client.checkAuthServiceConnection(); String requestId = MessageHelper.getNewRequestId(); authRequest.setRequestId(requestId); authRequest.setAction(SecurityAction.brokerDisconnect.name()); @@ -76,7 +75,6 @@ public AuthResponse brokerDisconnect(AuthRequest authRequest) throws JMSExceptio @Override public EntityResponse getEntity(EntityRequest entityRequest) throws JMSException, InterruptedException, JsonProcessingException { - client.checkAuthServiceConnection(); String requestId = MessageHelper.getNewRequestId(); entityRequest.setRequestId(requestId); ResponseContainer responseContainer = ResponseContainer.createAnRegisterNewMessageContainer(messageListener, entityRequest); diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java b/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java index cb5c5da1517..e86a5244cb1 100644 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java +++ b/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/Client.java @@ -14,6 +14,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -32,36 +33,39 @@ public class Client { private static Logger logger = LoggerFactory.getLogger(Client.class); + private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000; + private ConnectionFactory connectionFactory;//is this reference needed? private Connection connection;//keep to implement cleanup (and object lifecycle) private Session session; private MessageConsumer consumer; private MessageProducer producer; + private String clientId; + private String requestAddress; + private String replyAddress; + private ClientMessageListener clientMessageListener; + private ExceptionListener exceptionListener; - private ConnectionStatus connectionStatus; + private boolean active; + private boolean connectionStatus; public Client(String username, String password, String host, int port, String clientId, String requestAddress, String replyAddress, ClientMessageListener clientMessageListener) throws JMSException { - connectionStatus = new ConnectionStatus(); + this.clientId = clientId; + this.requestAddress = requestAddress; + this.replyAddress = replyAddress; + this.clientMessageListener = clientMessageListener; connectionFactory = new JmsConnectionFactory(username, password, "amqp://" + host + ":" + port); - connection = connectionFactory.createConnection(); - connection.setExceptionListener(new JMSExceptionListner(connectionStatus, clientId)); - connection.setClientID(clientId); - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - logger.info("AMQP client binding request sender to: {}", requestAddress); - logger.info("AMQP client binding message listener to: {}", replyAddress); - consumer = session.createConsumer(session.createQueue(replyAddress)); - consumer.setMessageListener(clientMessageListener); - producer = session.createProducer(session.createQueue(requestAddress)); - clientMessageListener.init(session, producer); - connectionStatus.setConnectionAlive(); - } + exceptionListener = new ExceptionListener() { - public void checkAuthServiceConnection() { - if (!isConnected()) { - //TODO throw exception and deny operations - } + @Override + public void onException(JMSException exception) { + connectionStatus = false; + connect(); + } + }; + active = true; + connect(); } public void sendMessage(TextMessage message) throws JMSException { @@ -73,12 +77,61 @@ public TextMessage createTextMessage() throws JMSException { return session.createTextMessage(); } - public boolean isConnected() { + public void stop() throws JMSException { + active = false; + disconnect(); + } + + private void disconnect() throws JMSException { + if (connection != null) { + connection.close(); + } + } + + private void connect() { + int connectAttempt = 0; + while (active && !isConnected()) { + logger.info("Connect attempt {}...", connectAttempt); + try { + logger.info("Service client {} - restarting attempt... {}", this, connectAttempt); + disconnect(); + connection = connectionFactory.createConnection(); + connection.setExceptionListener(exceptionListener); + connection.setClientID(clientId); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + logger.info("AMQP client binding request sender to: {}", requestAddress); + logger.info("AMQP client binding message listener to: {}", replyAddress); + consumer = session.createConsumer(session.createQueue(replyAddress)); + consumer.setMessageListener(clientMessageListener); + producer = session.createProducer(session.createQueue(requestAddress)); + clientMessageListener.init(session, producer); + connectionStatus = true; + logger.info("Service client {} - restarting attempt... {} DONE (Connection restored)", this, connectAttempt); + } catch (JMSException e) { + logger.info("Connect attempt {}... FAIL", connectAttempt, e); + //wait a bit + waitBeforeRetry(); + } + connectAttempt++; + } + } + + private boolean isConnected() { try { - return connection!=null && connection.getClientID()!=null && connectionStatus.isAlive(); + return connection!=null && connection.getClientID()!=null && connectionStatus; } catch (JMSException e) { logger.warn("Error while validating connection: {}", e.getMessage(), e); return false; } } + + private void waitBeforeRetry() { + try { + Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT); + } catch (InterruptedException e) { + logger.error("Wait for connect interrupted!", e); + } + } + } \ No newline at end of file diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ConnectionStatus.java b/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ConnectionStatus.java deleted file mode 100644 index 9341fe44aa2..00000000000 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/ConnectionStatus.java +++ /dev/null @@ -1,30 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - * - * Contributors: - * Eurotech - initial API and implementation - *******************************************************************************/ -package org.eclipse.kapua.client.security.amqpclient; - -public class ConnectionStatus { - - private boolean alive; - - public boolean isAlive() { - return alive; - } - - public void setConnectionAlive() { - alive = true; - } - - public void setConnectionFault() { - alive = false; - } -} \ No newline at end of file diff --git a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/JMSExceptionListner.java b/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/JMSExceptionListner.java deleted file mode 100644 index 6aa4859ec5f..00000000000 --- a/client/security/src/main/java/org/eclipse/kapua/client/security/amqpclient/JMSExceptionListner.java +++ /dev/null @@ -1,38 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - * - * Contributors: - * Eurotech - initial API and implementation - *******************************************************************************/ -package org.eclipse.kapua.client.security.amqpclient; - -import javax.jms.ExceptionListener; -import javax.jms.JMSException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JMSExceptionListner implements ExceptionListener { - - protected static final Logger logger = LoggerFactory.getLogger(JMSExceptionListner.class); - - private ConnectionStatus connectionStatus; - private String clientId; - - public JMSExceptionListner(ConnectionStatus connectionStatus, String clientId) { - this.connectionStatus = connectionStatus; - this.clientId = clientId; - } - - @Override - public void onException(JMSException e) { - connectionStatus.setConnectionFault(); - logger.warn("Client: {} - Error: {} ", clientId, e.getMessage()); - } -} \ No newline at end of file diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java index c4a56c6acc8..6cb78faf56b 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/jms/JMSServiceEventBus.java @@ -63,6 +63,8 @@ public class JMSServiceEventBus implements ServiceEventBus, ServiceEventBusDrive private static final Logger LOGGER = LoggerFactory.getLogger(JMSServiceEventBus.class); + private static final long WAIT_BETWEEN_RECONNECTION_ATTEMPT = 2000; + private final int producerPoolMinSize; private final int producerPoolMaxSize; private final int producerPoolBorrowWait; @@ -156,9 +158,8 @@ public ServiceEventBus getEventBus() { } private void waitBeforeRetry() { - // wait a bit try { - Thread.sleep(2000);// TODO move to configuration + Thread.sleep(WAIT_BETWEEN_RECONNECTION_ATTEMPT); } catch (InterruptedException e) { LOGGER.error("Wait for connect interrupted!", e); } @@ -318,7 +319,7 @@ synchronized void subscribe(Subscription subscription) final Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic jmsTopic = jmsSession.createTopic(subscriptionStr); for (int i = 0; i < consumerPoolSize; i++) { - MessageConsumer jmsConsumer = jmsSession.createSharedDurableConsumer(jmsTopic, subscription.getName()); + MessageConsumer jmsConsumer = jmsSession.createSharedConsumer(jmsTopic, subscription.getName()); jmsConsumer.setMessageListener(message -> { try { if (message instanceof TextMessage) { @@ -461,6 +462,7 @@ public void onException(JMSException e) { break; } catch (ServiceEventBusException | JMSException e1) { LOGGER.error("EventBus Listener {} - Cannot start new event bus connection... try again...", this, e1); + // wait a bit waitBeforeRetry(); } i++;