Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subsystem health check to liveness checks #1867

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,9 @@ pekko {
"pekko-contrib-mongodb-persistence-connection-snapshots"
]
}
management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

include "ditto-protocol-subscriber.conf"
Expand Down
4 changes: 4 additions & 0 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ pekko {
management.health-checks.readiness-checks {
gateway-http-readiness = "org.eclipse.ditto.gateway.service.health.GatewayHttpReadinessCheck"
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

include "ditto-protocol-subscriber.conf"
Expand Down
5 changes: 5 additions & 0 deletions internal/utils/health/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.health;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.pattern.Patterns;

/**
* Health check supplier for Pekko Management checking whether the root actor's health checking actor reports success.
* The health check will report failure if health checking actor replies with status DOWN or times out.
*/
public final class SubsystemHealthCheck implements Supplier<CompletionStage<Boolean>> {

/**
* The message to ask the health checking actor.
*/
private static final RetrieveHealth RETRIEVE_HEALTH_ASK_MESSAGE = RetrieveHealth.newInstance();

/**
* Default timeout for waiting for response from health checking actor.
*/
private static final Duration TIMEOUT = Duration.ofSeconds(30);

/**
* Health checking actor selection path.
*/
private static final String HEALTH_CHECKING_ACTOR_PATH =
"/user/*Root/" + DefaultHealthCheckingActorFactory.ACTOR_NAME;

private final ActorSelection healthCheckingActor;
private final Duration timeout;

/**
* Constructs subsystem health check with default timeout.
*
* @param system actor system to check health of
*/
public SubsystemHealthCheck(final ActorSystem system) {
this(system, TIMEOUT);
}

/**
* Constructs subsystem health check with custom timeout.
*
* @param system actor system to check health of
* @param timeout timeout
*/
public SubsystemHealthCheck(final ActorSystem system, final Duration timeout) {
healthCheckingActor = system.actorSelection(HEALTH_CHECKING_ACTOR_PATH);
this.timeout = timeout;
}

@Override
public CompletionStage<Boolean> get() {
return Patterns.ask(healthCheckingActor, RETRIEVE_HEALTH_ASK_MESSAGE, timeout)
.handle((answer, throwable) -> answer instanceof StatusInfo statusInfo
&& statusInfo.getStatus() != StatusInfo.Status.DOWN);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.health;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SubsystemHealthCheckTest {

private static final String TEST_ACTOR_READY = "ready";
private static final Duration HEALTH_CHECK_TIMEOUT = Duration.ofMillis(500);

/**
* Name of root actor in tests. Must end with "Root" to be recognized by {@link SubsystemHealthCheck}.
*/
private static final String ROOT_ACTOR_NAME = "testRoot";

private ActorSystem actorSystem;
private SubsystemHealthCheck subsystemHealthCheck;

@Before
public void setUp() {
actorSystem = ActorSystem.create();
subsystemHealthCheck = new SubsystemHealthCheck(actorSystem, HEALTH_CHECK_TIMEOUT);
}

@After
public void tearDown() {
if (actorSystem != null) {
actorSystem.terminate();
}
}

@Test
public void testCheckReturnsTrueWhenHealthCheckingActorReturnsUp() {
new TestKit(actorSystem) {{
final var statusInfo = StatusInfo.fromStatus(StatusInfo.Status.UP);
actorSystem.actorOf(TestRootActor.props(statusInfo, getRef()), ROOT_ACTOR_NAME);

expectMsg(TEST_ACTOR_READY);

assertHealthCheckResult(subsystemHealthCheck, true);
}};
}

@Test
public void testCheckReturnsTrueWhenHealthCheckingActorReturnsUnknown() {
new TestKit(actorSystem) {{
final var statusInfo = StatusInfo.fromStatus(StatusInfo.Status.UNKNOWN);
actorSystem.actorOf(TestRootActor.props(statusInfo, getRef()), ROOT_ACTOR_NAME);

expectMsg(TEST_ACTOR_READY);

assertHealthCheckResult(subsystemHealthCheck, true);
}};
}

@Test
public void testCheckReturnsFalseWhenHealthCheckingActorReturnsDown() {
new TestKit(actorSystem) {{
final var statusInfo = StatusInfo.fromStatus(StatusInfo.Status.DOWN);
actorSystem.actorOf(TestRootActor.props(statusInfo, getRef()), ROOT_ACTOR_NAME);

expectMsg(TEST_ACTOR_READY);

assertHealthCheckResult(subsystemHealthCheck, false);
}};
}

@Test
public void testCheckReturnsFalseWhenHealthCheckingActorDoesNotExist() {
new TestKit(actorSystem) {{
assertHealthCheckResult(subsystemHealthCheck, false);
}};
}

private static void assertHealthCheckResult(SubsystemHealthCheck subsystemHealthCheck, boolean expected) {
final var healthCheckResultFuture = subsystemHealthCheck.get();
Awaitility.await()
.atMost(Duration.ofNanos(2 * HEALTH_CHECK_TIMEOUT.toNanos()))
.until(() -> healthCheckResultFuture.toCompletableFuture().isDone());
assertThat(healthCheckResultFuture.toCompletableFuture())
.isCompletedWithValue(expected);
}

private static class TestRootActor extends AbstractActor {

private TestRootActor(final StatusInfo statusInfo, final ActorRef actorToNotify) {
getContext().actorOf(TestHealthCheckingActor.props(statusInfo, actorToNotify),
DefaultHealthCheckingActorFactory.ACTOR_NAME);
}

public static Props props(final StatusInfo statusInfo, final ActorRef actorToNotify) {
return Props.create(TestRootActor.class, statusInfo, actorToNotify);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchAny(this::unhandled)
.build();
}

}

private static class TestHealthCheckingActor extends AbstractActor {

private final StatusInfo statusInfo;
private final ActorRef actorToNotify;

private TestHealthCheckingActor(final StatusInfo statusInfo, final ActorRef actorToNotify) {
this.statusInfo = statusInfo;
this.actorToNotify = actorToNotify;
}

public static Props props(final StatusInfo statusInfo, final ActorRef actorToNotify) {
return Props.create(TestHealthCheckingActor.class, statusInfo, actorToNotify);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RetrieveHealth.class, this::returnStatusInfo)
.build();
}

@Override
public void preStart() {
actorToNotify.tell(TEST_ACTOR_READY, getSelf());
}

private void returnStatusInfo(final RetrieveHealth command) {
getSender().tell(this.statusInfo, getSelf());
}
}

}
4 changes: 4 additions & 0 deletions policies/service/src/main/resources/policies.conf
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ pekko {
"pekko-contrib-mongodb-persistence-policies-snapshots"
]
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

pekko-contrib-mongodb-persistence-policies-journal {
Expand Down
4 changes: 4 additions & 0 deletions things/service/src/main/resources/things.conf
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ pekko {
]
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}

}

pekko-contrib-mongodb-persistence-things-journal {
Expand Down
4 changes: 4 additions & 0 deletions thingsearch/service/src/main/resources/search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ pekko {
"blocked-namespaces-aware",
]
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

search-dispatcher {
Expand Down
Loading