Skip to content

Commit

Permalink
feat: reset mqtt3 client on config change
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Nov 8, 2023
1 parent 72d4d5c commit 7c5d145
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.aws.greengrass.integrationtests;

import com.aws.greengrass.config.Topics;
import com.aws.greengrass.config.UpdateBehaviorTree;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTest;
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTestContext;
Expand Down Expand Up @@ -38,12 +37,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.github.grantwest.eventually.EventuallyLambdaMatcher.eventuallyEval;
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
Expand All @@ -61,8 +58,6 @@
public class ConfigTest {
private static final long AWAIT_TIMEOUT_SECONDS = 30L;
private static final long RECEIVE_PUBLISH_SECONDS = 2L;
private static final Supplier<UpdateBehaviorTree> MERGE_UPDATE_BEHAVIOR =
() -> new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.MERGE, System.currentTimeMillis());

BridgeIntegrationTestContext testContext;

Expand Down Expand Up @@ -180,50 +175,6 @@ void GIVEN_Greengrass_with_mqtt_bridge_WHEN_connect_options_set_in_config_THEN_l
() -> largeMessageHandler.getLeft().get(RECEIVE_PUBLISH_SECONDS, TimeUnit.SECONDS));
}

@BridgeIntegrationTest(
withConfig = "config.yaml",
withBrokers = Broker.MQTT3)
void GIVEN_Greengrass_with_mqtt_bridge_WHEN_multiple_serialized_config_changes_occur_THEN_bridge_reinstalls_multiple_times(ExtensionContext context) throws Exception {
ignoreExceptionOfType(context, InterruptedException.class);

Semaphore bridgeRestarted = new Semaphore(1);
bridgeRestarted.acquire();

testContext.getKernel().getContext().addGlobalStateChangeListener((GreengrassService service, State was, State newState) -> {
if (service.getName().equals(MQTTBridge.SERVICE_NAME) && newState.equals(State.RUNNING)) {
bridgeRestarted.release();
}
});

Topics config = testContext.getKernel().locate(MQTTBridge.SERVICE_NAME).getConfig()
.lookupTopics(CONFIGURATION_CONFIG_KEY);

int numRestarts = 5;
for (int i = 0; i < numRestarts; i++) {
// change the configuration and wait for bridge to restart
config.updateFromMap(Utils.immutableMap(BridgeConfig.KEY_CLIENT_ID, String.format("clientId%d", i)), MERGE_UPDATE_BEHAVIOR.get());
assertTrue(bridgeRestarted.tryAcquire(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
}
}

@BridgeIntegrationTest(
withConfig = "config.yaml",
withBrokers = Broker.MQTT3)
void GIVEN_Greengrass_with_mqtt_bridge_WHEN_clientId_config_changes_THEN_bridge_reinstalls() throws Exception {
CountDownLatch bridgeRestarted = new CountDownLatch(1);
testContext.getKernel().getContext().addGlobalStateChangeListener((GreengrassService service, State was, State newState) -> {
if (service.getName().equals(MQTTBridge.SERVICE_NAME) && newState.equals(State.NEW)) {
bridgeRestarted.countDown();
}
});

Topics config = testContext.getKernel().locate(MQTTBridge.SERVICE_NAME).getConfig()
.lookupTopics(CONFIGURATION_CONFIG_KEY);
config.updateFromMap(Utils.immutableMap(BridgeConfig.KEY_CLIENT_ID, "new_client_id"), MERGE_UPDATE_BEHAVIOR.get());

assertTrue(bridgeRestarted.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
}

@BridgeIntegrationTest(
withConfig = "config.yaml",
withBrokers = Broker.MQTT3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@
import com.aws.greengrass.clientdevices.auth.certificate.CertificateHelper;
import com.aws.greengrass.clientdevices.auth.certificate.CertificateStore;
import com.aws.greengrass.config.Topic;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTest;
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTestContext;
import com.aws.greengrass.integrationtests.extensions.Broker;
import com.aws.greengrass.lifecyclemanager.GlobalStateChangeListener;
import com.aws.greengrass.lifecyclemanager.GreengrassService;
import com.aws.greengrass.mqtt.bridge.BridgeConfig;
import com.aws.greengrass.mqtt.bridge.MQTTBridge;
import com.aws.greengrass.mqtt.bridge.auth.MQTTClientKeyStore;
import com.aws.greengrass.mqtt.bridge.model.MqttVersion;
Expand All @@ -41,7 +37,6 @@
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
import static com.aws.greengrass.lifecyclemanager.GreengrassService.RUNTIME_STORE_NAMESPACE_TOPIC;
import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICES_NAMESPACE_TOPIC;
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
Expand Down Expand Up @@ -154,22 +149,8 @@ void GIVEN_mqtt_bridge_WHEN_cda_ca_conf_changed_after_shutdown_THEN_bridge_keyst
ignoreExceptionOfType(context, IllegalArgumentException.class);
ignoreExceptionOfType(context, NullPointerException.class);

// break bridge
CountDownLatch bridgeIsBroken = new CountDownLatch(1);
GlobalStateChangeListener listener = (GreengrassService service, State was, State newState) -> {
if (service.getName().equals(MQTTBridge.SERVICE_NAME) && service.getState().equals(State.BROKEN)) {
bridgeIsBroken.countDown();
}
};
Topic brokerUriTopic = testContext.getKernel().getConfig().lookup(
SERVICES_NAMESPACE_TOPIC,
MQTTBridge.SERVICE_NAME,
CONFIGURATION_CONFIG_KEY,
BridgeConfig.KEY_BROKER_URI
);
brokerUriTopic.withValue("garbage");
testContext.getKernel().getContext().addGlobalStateChangeListener(listener);
assertTrue(bridgeIsBroken.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
// shutdown the bridge
testContext.getFromContext(MQTTBridge.class).shutdown();

CountDownLatch keyStoreUpdated = new CountDownLatch(1);
MQTTClientKeyStore keyStore = testContext.getKernel().getContext().get(MQTTClientKeyStore.class);
Expand All @@ -191,8 +172,8 @@ void GIVEN_mqtt_bridge_WHEN_cda_ca_conf_changed_after_shutdown_THEN_bridge_keyst
Date.from(Instant.now().plusSeconds(100)),
"CA"))));

// shouldn't update
assertFalse(keyStoreUpdated.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
testContext.getKernel().getContext().waitForPublishQueueToClear();
assertFalse(keyStoreUpdated.await(5L, TimeUnit.SECONDS));
}

private CompletableFuture<Void> asyncAssertNumConnects(Integer numConnects) throws InterruptedException {
Expand Down
14 changes: 5 additions & 9 deletions src/main/java/com/aws/greengrass/mqtt/bridge/MQTTBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
import com.aws.greengrass.lifecyclemanager.PluginService;
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
import com.aws.greengrass.mqtt.bridge.auth.MQTTClientKeyStore;
import com.aws.greengrass.mqtt.bridge.clients.Configurable;
import com.aws.greengrass.mqtt.bridge.clients.IoTCoreClient;
import com.aws.greengrass.mqtt.bridge.clients.LocalMqtt5Client;
import com.aws.greengrass.mqtt.bridge.clients.LocalMqttClientFactory;
import com.aws.greengrass.mqtt.bridge.clients.MQTTClient;
import com.aws.greengrass.mqtt.bridge.clients.MessageClient;
import com.aws.greengrass.mqtt.bridge.clients.MessageClientException;
import com.aws.greengrass.mqtt.bridge.clients.PubSubClient;
Expand Down Expand Up @@ -295,17 +294,14 @@ public class ConfigurationChangeHandler {
return;
}

// TODO same for MQTT3 client
if (localMqttClient instanceof LocalMqtt5Client) {
((LocalMqtt5Client) localMqttClient).applyConfig(LocalMqtt5Client.Config.fromBridgeConfig(newConfig));
if (localMqttClient instanceof Configurable) {
((Configurable) localMqttClient).applyConfig(newConfig);
}
});

private boolean reinstallRequired(BridgeConfig prevConfig, BridgeConfig newConfig) {
return !Objects.equals(prevConfig.getMqttVersion(), newConfig.getMqttVersion()) // to switch between clients
|| localMqttClient instanceof MQTTClient
&& (!Objects.equals(prevConfig.getBrokerUri(), newConfig.getBrokerUri())
|| !Objects.equals(prevConfig.getClientId(), newConfig.getClientId()));
// to switch between clients
return !Objects.equals(prevConfig.getMqttVersion(), newConfig.getMqttVersion());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.mqtt.bridge.clients;

import com.aws.greengrass.mqtt.bridge.BridgeConfig;

@FunctionalInterface
public interface Configurable {
void applyConfig(BridgeConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import static com.aws.greengrass.mqtt.bridge.model.Mqtt5RouteOptions.DEFAULT_NO_LOCAL;

@SuppressWarnings("PMD.CloseResource")
public class LocalMqtt5Client implements MessageClient<MqttMessage> {
public class LocalMqtt5Client implements MessageClient<MqttMessage>, Configurable {

private static final Logger LOGGER = LogManager.getLogger(LocalMqtt5Client.class);

Expand Down Expand Up @@ -334,11 +334,16 @@ public static boolean resetRequired(Config prevConfig, Config newConfig) {
* Apply new configuration to this client. Depending on what configurations changed, a
* {@link LocalMqtt5Client#reset()} may occur to apply them.
*
* @param config new configuration
* @param bridgeConfig new bridge configuration
*/
public void applyConfig(@NonNull Config config) {
@Override
public void applyConfig(@NonNull BridgeConfig bridgeConfig) {
applyConfig(Config.fromBridgeConfig(bridgeConfig));
}

void applyConfig(Config newConfig) {
Config previousConfig = this.config;
this.config = config;
this.config = newConfig;
if (Config.resetRequired(previousConfig, config)) {
scheduleResetTask();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public MessageClient<MqttMessage> createLocalMqttClient() throws MessageClientEx
case MQTT3: // fall-through
default:
return new MQTTClient(
config.getBrokerUri(),
config.getClientId(),
config,
mqttClientKeyStore,
executorService
);
Expand Down
Loading

0 comments on commit 7c5d145

Please sign in to comment.