Skip to content

Commit

Permalink
enforcer actor ack time out handled in atomic thing create context (#…
Browse files Browse the repository at this point in the history
…1598)

enforcer actor ack time out handled in atomic thing create context

* Create thing is rollbacked if enforcer ask times out
* logging and refactor fixes
* logs cleanup
* supervisor configurable local ask timeout
* CachingPolicyEnforcerProvider is able to force cache invalidation
* update ConnectivitySupervisorActor ask timeout config

Signed-off-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io>
  • Loading branch information
Aleksandar Stanchev authored Mar 29, 2023
1 parent 5e6442e commit 8bb7cd5
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.base.service.config.supervision;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;

/**
* This class is the default implementation of the local ACK timeout config.
*/
@Immutable
public class DefaultLocalAskTimeoutConfig implements LocalAskTimeoutConfig {

private static final String CONFIG_PATH = "local-ask";
private final Duration askTimeout;

private DefaultLocalAskTimeoutConfig(final ScopedConfig config) {
askTimeout = config.getNonNegativeAndNonZeroDurationOrThrow(LocalAskTimeoutConfigValue.ASK_TIMEOUT);
}

/**
* Returns an instance of {@code DefaultLocalAskTimeoutConfig} based on the settings of the specified Config.
*
* @param config is supposed to provide the settings of the local ASK timeout config at {@value #CONFIG_PATH}.
* @return the instance.
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultLocalAskTimeoutConfig of(final Config config) {
return new DefaultLocalAskTimeoutConfig(ConfigWithFallback.newInstance(config, CONFIG_PATH,
LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.values()));
}

@Override
public Duration getLocalAckTimeout() {
return askTimeout;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DefaultLocalAskTimeoutConfig that = (DefaultLocalAskTimeoutConfig) o;
return Objects.equals(askTimeout, that.askTimeout);
}

@Override
public int hashCode() {
return Objects.hash(askTimeout);
}

@Override
public String toString() {
return getClass().getSimpleName() + "[" +
"askTimeout=" + askTimeout +
']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ public final class DefaultSupervisorConfig implements SupervisorConfig {
private static final String CONFIG_PATH = "supervisor";

private final ExponentialBackOffConfig exponentialBackOffConfig;
private final DefaultLocalAskTimeoutConfig localAskTimeoutConfig;

private DefaultSupervisorConfig(final ExponentialBackOffConfig theExponentialBackOffConfig) {
private DefaultSupervisorConfig(final ExponentialBackOffConfig theExponentialBackOffConfig,
final DefaultLocalAskTimeoutConfig theLocalAskTimeoutConfig) {
exponentialBackOffConfig = theExponentialBackOffConfig;
localAskTimeoutConfig = theLocalAskTimeoutConfig;
}

/**
Expand All @@ -45,14 +48,20 @@ private DefaultSupervisorConfig(final ExponentialBackOffConfig theExponentialBac
public static DefaultSupervisorConfig of(final Config config) {
final ScopedConfig supervisorScopedConfig = DefaultScopedConfig.newInstance(config, CONFIG_PATH);

return new DefaultSupervisorConfig(DefaultExponentialBackOffConfig.of(supervisorScopedConfig));
return new DefaultSupervisorConfig(DefaultExponentialBackOffConfig.of(supervisorScopedConfig),
DefaultLocalAskTimeoutConfig.of(supervisorScopedConfig));
}

@Override
public ExponentialBackOffConfig getExponentialBackOffConfig() {
return exponentialBackOffConfig;
}

@Override
public LocalAskTimeoutConfig getLocalAskTimeoutConfig() {
return localAskTimeoutConfig;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.base.service.config.supervision;

import java.time.Duration;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

/**
* Provides configuration settings for the local ACK timeout.
*/
@Immutable
public interface LocalAskTimeoutConfig {

/**
* Timeout for local actor invocations - a small timeout should be more than sufficient as those are just method
* calls.
* @return the duration for a local ACK timeout calls.
*/
Duration getLocalAckTimeout();


/**
* An enumeration of the known config path expressions and their associated default values for
* {@code LocalAskTimeoutConfigValue}.
*/
enum LocalAskTimeoutConfigValue implements KnownConfigValue {

/**
* The local ACK timeout duration.
*/
ASK_TIMEOUT("timeout", Duration.ofSeconds(5L));

private final String path;
private final Duration defaultValue;

LocalAskTimeoutConfigValue(final String thePath, final Duration theDefaultValue) {

this.path = thePath;
this.defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public interface SupervisorConfig {
*/
ExponentialBackOffConfig getExponentialBackOffConfig();

/** Returns the config for supervisor local ACK timeout calls.
* @return the config.
*/
LocalAskTimeoutConfig getLocalAskTimeoutConfig();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.base.service.config.supervision;

import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import java.time.Duration;

import org.assertj.core.api.JUnitSoftAssertions;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import nl.jqno.equalsverifier.EqualsVerifier;

/**
* Unit test for {@link DefaultLocalAskTimeoutConfigTest}.
*/
public class DefaultLocalAskTimeoutConfigTest {

private static Config supervisorLocalAskTimeoutConfig;

@Rule
public final JUnitSoftAssertions softly = new JUnitSoftAssertions();

@BeforeClass
public static void initTestFixture() {
supervisorLocalAskTimeoutConfig = ConfigFactory.load("local-ask-timout-test");
}

@Test
public void assertImmutability() {
assertInstancesOf(DefaultLocalAskTimeoutConfig.class,
areImmutable());
}

@Test
public void testHashCodeAndEquals() {
EqualsVerifier.forClass(DefaultLocalAskTimeoutConfig.class)
.usingGetClass()
.verify();
}

@Test
public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
final DefaultLocalAskTimeoutConfig underTest = DefaultLocalAskTimeoutConfig.of(ConfigFactory.empty());

softly.assertThat(underTest.getLocalAckTimeout())
.as(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ASK_TIMEOUT.getConfigPath())
.isEqualTo(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ASK_TIMEOUT.getDefaultValue());
}

@Test
public void underTestReturnsValuesOfConfigFile() {
final DefaultLocalAskTimeoutConfig underTest = DefaultLocalAskTimeoutConfig.of(supervisorLocalAskTimeoutConfig);

softly.assertThat(underTest.getLocalAckTimeout())
.as(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ASK_TIMEOUT.getConfigPath())
.isEqualTo(Duration.ofSeconds(10L));
}
}
3 changes: 3 additions & 0 deletions base/service/src/test/resources/local-ask-timout-test.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
local-ask {
timeout = 10s
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
Expand Down Expand Up @@ -72,12 +73,6 @@ public final class ConnectionSupervisorActor

private static final Duration MAX_CONFIG_RETRIEVAL_DURATION = Duration.ofSeconds(5);

/**
* For connectivity, this local ask timeout has to be higher as e.g. "openConnection" commands performed in a
* "staged" way will lead to quite some response times.
*/
private static final Duration CONNECTIVITY_DEFAULT_LOCAL_ASK_TIMEOUT = Duration.ofSeconds(50);

private static final SupervisorStrategy SUPERVISOR_STRATEGY =
new OneForOneStrategy(true,
DeciderBuilder.match(JMSRuntimeException.class, e ->
Expand All @@ -101,7 +96,7 @@ private ConnectionSupervisorActor(final ActorRef commandForwarderActor,
final ConnectionEnforcerActorPropsFactory enforcerActorPropsFactory,
final MongoReadJournal mongoReadJournal) {

super(null, mongoReadJournal, CONNECTIVITY_DEFAULT_LOCAL_ASK_TIMEOUT);
super(null, mongoReadJournal);
this.commandForwarderActor = commandForwarderActor;
this.pubSubMediator = pubSubMediator;
this.enforcerActorPropsFactory = enforcerActorPropsFactory;
Expand Down Expand Up @@ -205,6 +200,14 @@ protected ExponentialBackOffConfig getExponentialBackOffConfig() {
return connectionConfig.getSupervisorConfig().getExponentialBackOffConfig();
}

@Override
protected LocalAskTimeoutConfig getLocalAskTimeoutConfig() {
return DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()))
.getConnectionConfig()
.getSupervisorConfig()
.getLocalAskTimeoutConfig();
}

@Override
protected ShutdownBehaviour getShutdownBehaviour(final ConnectionId entityId) {
return ShutdownBehaviour.fromIdWithoutNamespace(entityId, pubSubMediator, getSelf());
Expand Down
7 changes: 7 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ ditto {
random-factor = 1.0
corrupted-receive-timeout = 600s
}

# For connectivity, this local ask timeout has to be higher as e.g. "openConnection" commands performed in a
# "staged" way will lead to quite some response times.
local-ask {
timeout = 50s
timeout = ${?THINGS_SUPERVISOR_LOCAL_ASK_TIMEOUT}
}
}

snapshot {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private DittoRuntimeException reportError(final Command<?> command,
: throwable;
final var dre = DittoRuntimeException.asDittoRuntimeException(
error, t -> reportUnexpectedError(command, t));
LOGGER.info(" - {}: {}", dre.getClass().getSimpleName(), dre.getMessage());
LOGGER.withCorrelationId(command).info("{}: {}", dre.getClass().getSimpleName(), dre.getMessage());
return dre;
}

Expand Down
Loading

0 comments on commit 8bb7cd5

Please sign in to comment.