diff --git a/bom/pom.xml b/bom/pom.xml index 73de6d8cff9..63b8cea9cb4 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -781,6 +781,16 @@ helidon-lra-coordinator-narayana-client ${helidon.version} + + io.helidon.lra + helidon-lra-coordinator-server + ${helidon.version} + + + io.helidon.microprofile.lra + helidon-microprofile-lra-testing + ${helidon.version} + diff --git a/docs/pom.xml b/docs/pom.xml index 238da6f5a6a..2c20ea103ef 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -86,6 +86,21 @@ postgresql true + + io.helidon.microprofile.lra + helidon-microprofile-lra-testing + true + + + org.hamcrest + hamcrest-all + true + + + io.helidon.microprofile.testing + helidon-microprofile-testing-junit5 + true + diff --git a/docs/src/main/asciidoc/mp/lra.adoc b/docs/src/main/asciidoc/mp/lra.adoc index 96a186976bd..1fcdc199d3d 100644 --- a/docs/src/main/asciidoc/mp/lra.adoc +++ b/docs/src/main/asciidoc/mp/lra.adoc @@ -24,6 +24,7 @@ :spec-version: 1.0-RC3 :spec-name: MicroProfile {feature-name} specification :javadoc-link: https://download.eclipse.org/microprofile/microprofile-lra-{spec-version}/apidocs/org/eclipse/microprofile/lra/annotation/ +:microtx-link: https://docs.oracle.com/en/database/oracle/transaction-manager-for-microservices/index.html :rootdir: {docdir}/.. include::{rootdir}/includes/mp.adoc[] @@ -36,8 +37,10 @@ include::{rootdir}/includes/mp.adoc[] * <> * <> * <> +* <> * <> ** <> +** <> ** <> ** <> * <> @@ -351,6 +354,73 @@ include::{sourcedir}/mp/LraSnippets.java[tag=snippet_13, indent=0] <5> Method which will be called by coordinator when LRA is completed <6> Method which will be called by coordinator when LRA is canceled +== Testing +Testing of JAX-RS resources with LRA can be challenging as LRA participant running in parallel with the test is needed. + +Helidon provides test coordinator which can be started automatically with additional socket on a random port within your +own Helidon application. You only need one extra test dependency to enable test coordinator in your xref:testing/testing.adoc[@HelidonTest]. + +[source, xml] +.Dependency +---- + + io.helidon.microprofile.lra + helidon-microprofile-lra-testing + test + +---- + +Considering that you have LRA enabled JAX-RS resource you want to test. + +[source, java] +.Example JAX-RS resource with LRA. +---- +include::{sourcedir}/mp/LraSnippets.java[tag=snippet_14, indent=0] +---- + +Helidon test with enabled CDI discovery can look like this. + +[source, java] +.HelidonTest with LRA test support. +---- +include::{sourcedir}/mp/LraSnippets.java[tag=snippet_15, indent=0] +---- +<1> Resource is discovered automatically +<2> Test coordinator needs to be added manually +<3> Injecting test coordinator to access state of LRA managed by coordinator mid-test +<4> Retrieving LRA managed by coordinator by LraId +<5> Asserting LRA state in coordinator + +LRA testing feature has the following default configuration: + +* port: `0` - coordinator is started on random port(Helidon LRA participant is capable to discover test coordinator automatically) +* bind-address: `localhost` - bind address of the coordinator +* helidon.lra.coordinator.persistence: `false` - LRAs managed by test coordinator are not persisted +* helidon.lra.participant.use-build-time-index: `false` - Participant annotation inspection ignores Jandex index files created in build time, it helps to avoid issues with additional test resources + +Testing LRA coordinator is started on additional named socket `test-lra-coordinator` configured with default index `500`. +Default index can be changed with system property `helidon.lra.coordinator.test-socket.index`. + +Example: `-Dhelidon.lra.coordinator.test-socket.index=20`. + +[source, java] +.HelidonTest override LRA test feature default settings. +---- +include::{sourcedir}/mp/LraSnippets.java[tag=snippet_16, indent=0] +---- +<1> Start test LRA coordinator always on the same port 8070(default is random port) +<2> Test LRA coordinator socket bind address (default is localhost) +<3> Persist LRA managed by coordinator(default is false) +<4> Use build time Jandex index(default is false) + +When CDI bean auto-discovery is not desired, LRA and Config CDI extensions needs to be added manually. + +[source, java] +.HelidonTest setup with disabled discovery. +---- +include::{sourcedir}/mp/LraSnippets.java[tag=snippet_17, indent=0] +---- + == Additional Information === Coordinator @@ -359,12 +429,65 @@ the participants when the LRA transaction gets cancelled or completes (in case i In addition, participant also keeps track of timeouts, retries participant calls, and assigns LRA ids. .Helidon LRA supports following coordinators +* {microtx-link}[MicroTx LRA coordinator] * Helidon LRA coordinator * https://narayana.io/lra[Narayana coordinator]. +=== MicroTx LRA Coordinator +Oracle Transaction Manager for Microservices - {microtx-link}[MicroTx] is an enterprise grade transaction manager for microservices, +among other it manages LRA transactions and is compatible with Narayana LRA clients. + +MicroTx LRA coordinator is compatible with Narayana clients when `narayanaLraCompatibilityMode` is on, +you need to add another dependency to enable Narayana client: +[source,xml] +.Dependency needed for using Helidon LRA with Narayana compatible coordinator +---- + + io.helidon.lra + helidon-lra-coordinator-narayana-client + +---- + +[source, bash] +.Run MicroTx in Docker +---- +docker container run --name otmm -v "$(pwd)":/app/config \ +-w /app/config -p 8080:8080/tcp --env CONFIG_FILE=tcs.yaml \ +--add-host host.docker.internal:host-gateway -d tmm: +---- + +To use MicroTx with Helidon LRA participant, `narayanaLraCompatibilityMode` needs to be enabled. + +[source, yaml] +.Configure MicroTx for development +---- +tmmAppName: tcs +tmmConfiguration: + listenAddr: 0.0.0.0:8080 + internalAddr: http://host.docker.internal:8080 + externalUrl: http://lra-coordinator.acme.com:8080 + xaCoordinator: + enabled: false + lraCoordinator: + enabled: true + tccCoordinator: + enabled: false + storage: + type: memory + authentication: + enabled: false + authorization: + enabled: false + serveTLS: + enabled: false + narayanaLraCompatibilityMode: + enabled: true #<1> +---- +<1> Enable Narayana compatibility mode + === Helidon LRA Coordinator -CAUTION: Experimental tool, usage in production is not advised. +CAUTION: Test tool, usage in production is not advised. [source,bash] .Build and run Helidon LRA coordinator @@ -417,3 +540,4 @@ with port `8070` defined in the snippet above you need to configure your Helidon * {microprofile-lra-spec-url}[{spec-name}] * https://download.eclipse.org/microprofile/microprofile-lra-{spec-version}/apidocs/org/eclipse/microprofile/lra/[Microprofile LRA JavaDoc] * https://helidon.io/docs/v4/apidocs/io.helidon.lra.coordinator.client/module-summary.html[Helidon LRA Client JavaDoc] +* {microtx-link}[MicroTx - Oracle Transaction Manager for Microservices] diff --git a/docs/src/main/java/io/helidon/docs/mp/LraSnippets.java b/docs/src/main/java/io/helidon/docs/mp/LraSnippets.java index e859b9c4c55..e643d0d9aae 100644 --- a/docs/src/main/java/io/helidon/docs/mp/LraSnippets.java +++ b/docs/src/main/java/io/helidon/docs/mp/LraSnippets.java @@ -17,14 +17,33 @@ import java.net.URI; import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import java.util.logging.Logger; +import io.helidon.lra.coordinator.Lra; +import io.helidon.microprofile.config.ConfigCdiExtension; +import io.helidon.microprofile.lra.LraCdiExtension; +import io.helidon.microprofile.testing.junit5.AddBean; +import io.helidon.microprofile.testing.junit5.AddConfig; +import io.helidon.microprofile.testing.junit5.AddExtension; +import io.helidon.microprofile.testing.junit5.AddJaxRs; +import io.helidon.microprofile.testing.junit5.DisableDiscovery; +import io.helidon.microprofile.testing.junit5.HelidonTest; +import io.helidon.microprofile.testing.lra.TestLraCoordinator; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import jakarta.ws.rs.DELETE; import jakarta.ws.rs.GET; import jakarta.ws.rs.HeaderParam; import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import org.eclipse.microprofile.lra.LRAResponse; import org.eclipse.microprofile.lra.annotation.AfterLRA; @@ -36,10 +55,15 @@ import org.eclipse.microprofile.lra.annotation.Status; import org.eclipse.microprofile.lra.annotation.ws.rs.LRA; import org.eclipse.microprofile.lra.annotation.ws.rs.Leave; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_CONTEXT_HEADER; import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_ENDED_CONTEXT_HEADER; import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_PARENT_CONTEXT_HEADER; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; @SuppressWarnings("ALL") class LraSnippets { @@ -218,4 +242,91 @@ public Response compensateExample(@HeaderParam(LRA_HTTP_CONTEXT_HEADER) URI lraI // end::snippet_13[] } + // tag::snippet_14[] + @ApplicationScoped + @Path("/test") + public class WithdrawResource { + + private final List completedLras = new CopyOnWriteArrayList<>(); + private final List cancelledLras = new CopyOnWriteArrayList<>(); + + @PUT + @Path("/withdraw") + @LRA(LRA.Type.REQUIRES_NEW) + public Response withdraw(@HeaderParam(LRA.LRA_HTTP_CONTEXT_HEADER) Optional lraId, String content) { + if ("BOOM".equals(content)) { + throw new IllegalArgumentException("BOOM"); + } + return Response.ok().build(); + } + + @Complete + public void complete(URI lraId) { + completedLras.add(lraId.toString()); + } + + @Compensate + public void rollback(URI lraId) { + cancelledLras.add(lraId.toString()); + } + + public List getCompletedLras() { + return completedLras; + } + } + // end::snippet_14[] + + // tag::snippet_15[] + @HelidonTest + //@AddBean(WithdrawResource.class) //<1> + @AddBean(TestLraCoordinator.class) //<2> + public class LraTest { + + @Inject + private WithdrawResource withdrawTestResource; + + @Inject + private TestLraCoordinator coordinator; //<3> + + @Inject + private WebTarget target; + + @Test + public void testLraComplete() { + try (Response res = target + .path("/test/withdraw") + .request() + .put(Entity.entity("test", MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(200)); + String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER); + Lra lra = coordinator.lra(lraId); //<4> + assertThat(lra.status(), is(LRAStatus.Closed)); //<5> + assertThat(withdrawTestResource.getCompletedLras(), contains(lraId)); + } + } + } + // end::snippet_15[] + + // tag::snippet_16[] + @HelidonTest + @AddBean(TestLraCoordinator.class) + @AddConfig(key = "server.sockets.500.port", value = "8070") //<1> + @AddConfig(key = "server.sockets.500.bind-address", value = "custom.bind.name") //<2> + @AddConfig(key = "helidon.lra.coordinator.persistence", value = "true") //<3> + @AddConfig(key = "helidon.lra.participant.use-build-time-index", value = "true") //<4> + public class LraCustomConfigTest { + } + // end::snippet_16[] + + // tag::snippet_17[] + @HelidonTest + @DisableDiscovery + @AddJaxRs + @AddBean(TestLraCoordinator.class) + @AddExtension(LraCdiExtension.class) + @AddExtension(ConfigCdiExtension.class) + @AddBean(WithdrawResource.class) + public class LraNoDiscoveryTest { + } + // end::snippet_17[] } diff --git a/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java b/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java index ae2f0a20fc5..df03a34535f 100644 --- a/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java +++ b/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +46,8 @@ import org.eclipse.microprofile.lra.annotation.LRAStatus; import org.eclipse.microprofile.lra.annotation.ws.rs.LRA; +import static java.lang.System.Logger.Level.DEBUG; + /** * Narayana LRA coordinator client. */ @@ -103,6 +105,7 @@ private URI startInternal(URI parentLRA, String clientID, PropagatedHeaders head .map(p -> parseBaseUri(p.toASCIIString())) .orElse(coordinatorUriSupplier.get()); + logF("Starting LRA, coordinator: {0}/start, clientId: {1}, timeout: {2}", baseUri, clientID, timeout); return retry.invoke(() -> { HttpClientRequest req = prepareWebClient(baseUri) .post() @@ -139,6 +142,7 @@ private URI startInternal(URI parentLRA, String clientID, PropagatedHeaders head @Override public void cancel(URI lraId, PropagatedHeaders headers) { + logF("Cancelling LRA {0}", lraId); retry.invoke(() -> { var req = prepareWebClient(lraId) .put() @@ -164,6 +168,7 @@ public void cancel(URI lraId, PropagatedHeaders headers) { @Override public void close(URI lraId, PropagatedHeaders headers) { + logF("Closing LRA {0}", lraId); retry.invoke(() -> { var req = prepareWebClient(lraId) .put() @@ -197,6 +202,7 @@ public Optional join(URI lraId, Participant p) { String links = compensatorLinks(p); + logF("Joining LRA {0} with links: {1}", lraId, links); return retry.invoke(() -> { var req = prepareWebClient(lraId) .put() @@ -227,13 +233,14 @@ public Optional join(URI lraId, throw connectionError("Unexpected coordinator response ", res.status().code()); } } catch (Exception e) { - throw connectionError("Unable to join LRA", e); + throw connectionError("Unable to join LRA " + lraId, e); } }); } @Override public void leave(URI lraId, PropagatedHeaders headers, Participant p) { + logF("Leaving LRA {0} participant: {1}", lraId, p); retry.invoke(() -> { var req = prepareWebClient(lraId) .put() @@ -260,6 +267,7 @@ public void leave(URI lraId, PropagatedHeaders headers, Participant p) { @Override public LRAStatus status(URI lraId, PropagatedHeaders headers) { + logF("Checking status of LRA {0}", lraId); return retry.invoke(() -> { var req = prepareWebClient(lraId) .get() @@ -354,7 +362,9 @@ private CoordinatorConnectionException connectionError(String message, Throwable } private void logF(String msg, Object... params) { - LOGGER.log(Level.DEBUG, msg, params); + if (LOGGER.isLoggable(DEBUG)) { + LOGGER.log(DEBUG, msg, params); + } } } diff --git a/lra/coordinator/server/pom.xml b/lra/coordinator/server/pom.xml index 132cc12ccd3..015c333267b 100644 --- a/lra/coordinator/server/pom.xml +++ b/lra/coordinator/server/pom.xml @@ -23,25 +23,32 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - io.helidon.applications - helidon-se + io.helidon.lra + helidon-lra-coordinator-project 4.2.0-SNAPSHOT - ../../../applications/se/pom.xml - io.helidon.lra helidon-lra-coordinator-server Helidon LRA Coordinator Narayana compatible LRA coordinator - true - true - true io.helidon.lra.coordinator.Main + + + + io.helidon.applications + helidon-se + ${project.parent.version} + pom + import + + + + org.eclipse.microprofile.lra @@ -131,6 +138,120 @@ + ${project.artifactId} + + + + kr.motd.maven + os-maven-plugin + ${version.plugin.os} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${version.plugin.compiler} + + full + + + + org.apache.maven.plugins + maven-surefire-plugin + ${version.plugin.surefire} + + false + + ${project.build.outputDirectory}/logging.properties + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${version.plugin.failsafe} + + false + true + + + + org.apache.maven.plugins + maven-dependency-plugin + ${version.plugin.dependency} + + + copy-libs + prepare-package + + copy-dependencies + + + ${project.build.directory}/libs + false + false + true + true + runtime + + okio + + + + + + org.apache.maven.plugins + maven-resources-plugin + ${version.plugin.resources} + + + org.apache.maven.plugins + maven-jar-plugin + ${version.plugin.jar} + + + + true + libs + + ${mainClass} + false + + + + + + org.codehaus.mojo + exec-maven-plugin + ${version.plugin.exec} + + java + true + + -classpath + + ${mainClass} + + + + + io.helidon.build-tools + helidon-maven-plugin + ${version.plugin.helidon} + + + io.helidon.licensing + helidon-licensing + ${helidon.version} + + + + + + org.apache.maven.plugins diff --git a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/CoordinatorService.java b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/CoordinatorService.java index bebdf1d0b89..5636870ecf8 100644 --- a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/CoordinatorService.java +++ b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/CoordinatorService.java @@ -95,23 +95,23 @@ public class CoordinatorService implements HttpService { init(); } - private void init() { - lraPersistentRegistry.load(this); - recoveryTask = Scheduling.fixedRate() - .delay(config.get("recovery-interval").asLong().orElse(200L)) - .initialDelay(200) - .timeUnit(TimeUnit.MILLISECONDS) - .task(this::tick) + /** + * Create a new Lra coordinator. + * + * @return coordinator + */ + public static CoordinatorService create() { + return builder() .build(); + } - if (config.get("periodical-persist").asBoolean().orElse(false)) { - persistTask = Scheduling.fixedRate() - .delay(config.get("persist-interval").asLong().orElse(5000L)) - .initialDelay(200) - .timeUnit(TimeUnit.MILLISECONDS) - .task(inv -> lraPersistentRegistry.save()) - .build(); - } + /** + * Create a new fluent API builder. + * + * @return a new builder + */ + public static Builder builder() { + return new Builder(); } /** @@ -149,6 +149,39 @@ public void routing(HttpRules rules) { .put("/{LraId}/remove", this::leave); } + /** + * Get LRA by lraId. + * + * @param lraId without coordinator uri prefix + * @return LRA when managed by this coordinator or null + */ + public Lra lra(String lraId) { + return this.lraPersistentRegistry.get(lraId); + } + + LazyValue coordinatorURL() { + return coordinatorURL; + } + + private void init() { + lraPersistentRegistry.load(this); + recoveryTask = Scheduling.fixedRateBuilder() + .delay(config.get("recovery-interval").asLong().orElse(200L)) + .initialDelay(200) + .timeUnit(TimeUnit.MILLISECONDS) + .task(this::tick) + .build(); + + if (config.get("periodical-persist").asBoolean().orElse(false)) { + persistTask = Scheduling.fixedRateBuilder() + .delay(config.get("persist-interval").asLong().orElse(5000L)) + .initialDelay(200) + .timeUnit(TimeUnit.MILLISECONDS) + .task(inv -> lraPersistentRegistry.save()) + .build(); + } + } + /** * Ask coordinator to start new LRA and return its id. * @@ -163,15 +196,15 @@ private void start(ServerRequest req, ServerResponse res) { String lraUUID = UUID.randomUUID().toString(); URI lraId = coordinatorUriWithPath(lraUUID); if (!parentLRA.isEmpty()) { - Lra parent = lraPersistentRegistry.get(parentLRA.replace(coordinatorURL.get().toASCIIString() + "/", "")); + LraImpl parent = lraPersistentRegistry.get(parentLRA.replace(coordinatorURL.get().toASCIIString() + "/", "")); if (parent != null) { - Lra childLra = new Lra(this, lraUUID, URI.create(parentLRA), this.config); + LraImpl childLra = new LraImpl(this, lraUUID, URI.create(parentLRA), this.config); childLra.setupTimeout(timeLimit); lraPersistentRegistry.put(lraUUID, childLra); parent.addChild(childLra); } } else { - Lra newLra = new Lra(this, lraUUID, config); + LraImpl newLra = new LraImpl(this, lraUUID, config); newLra.setupTimeout(timeLimit); lraPersistentRegistry.put(lraUUID, newLra); } @@ -189,12 +222,12 @@ private void start(ServerRequest req, ServerResponse res) { */ private void close(ServerRequest req, ServerResponse res) { String lraId = req.path().pathParameters().get("LraId"); - Lra lra = lraPersistentRegistry.get(lraId); + LraImpl lra = lraPersistentRegistry.get(lraId); if (lra == null) { res.status(NOT_FOUND_404).send(); return; } - if (lra.status().get() != LRAStatus.Active) { + if (lra.lraStatus().get() != LRAStatus.Active) { // Already time-outed res.status(GONE_410).send(); return; @@ -211,7 +244,7 @@ private void close(ServerRequest req, ServerResponse res) { */ private void cancel(ServerRequest req, ServerResponse res) { String lraId = req.path().pathParameters().get("LraId"); - Lra lra = lraPersistentRegistry.get(lraId); + LraImpl lra = lraPersistentRegistry.get(lraId); if (lra == null) { res.status(NOT_FOUND_404).send(); return; @@ -231,7 +264,7 @@ private void join(ServerRequest req, ServerResponse res) { String lraId = req.path().pathParameters().get("LraId"); String compensatorLink = req.headers().first(HeaderNames.LINK).orElse(""); - Lra lra = lraPersistentRegistry.get(lraId); + LraImpl lra = lraPersistentRegistry.get(lraId); if (lra == null) { res.status(NOT_FOUND_404).send(); return; @@ -257,14 +290,14 @@ private void join(ServerRequest req, ServerResponse res) { */ private void status(ServerRequest req, ServerResponse res) { String lraId = req.path().pathParameters().get("LraId"); - Lra lra = lraPersistentRegistry.get(lraId); + LraImpl lra = lraPersistentRegistry.get(lraId); if (lra == null) { res.status(NOT_FOUND_404).send(); return; } res.status(OK_200) - .send(lra.status().get().name()); + .send(lra.lraStatus().get().name()); } /** @@ -278,7 +311,7 @@ private void leave(ServerRequest req, ServerResponse res) { String lraId = req.path().pathParameters().get("LraId"); String compensatorLinks = req.content().as(String.class); - Lra lra = lraPersistentRegistry.get(lraId); + LraImpl lra = lraPersistentRegistry.get(lraId); if (lra == null) { res.status(NOT_FOUND_404).send(); } else { @@ -307,13 +340,13 @@ private void recovery(ServerRequest req, ServerResponse res) { }); if (lraUUID.isPresent()) { - Lra lra = lraPersistentRegistry.get(lraUUID.get()); + LraImpl lra = lraPersistentRegistry.get(lraUUID.get()); if (lra != null) { - if (RECOVERABLE_STATUSES.contains(lra.status().get())) { + if (RECOVERABLE_STATUSES.contains(lra.lraStatus().get())) { JsonObject json = JSON.createObjectBuilder() .add("lraId", lra.lraId()) - .add("status", lra.status().get().name()) - .add("recovering", Set.of(LRAStatus.Closed, LRAStatus.Cancelled).contains(lra.status().get())) + .add("status", lra.lraStatus().get().name()) + .add("recovering", Set.of(LRAStatus.Closed, LRAStatus.Cancelled).contains(lra.lraStatus().get())) .build(); res.status(OK_200).send(json); } else { @@ -325,10 +358,10 @@ private void recovery(ServerRequest req, ServerResponse res) { } else { JsonArray jsonValues = lraPersistentRegistry .stream() - .filter(lra -> RECOVERABLE_STATUSES.contains(lra.status().get())) + .filter(lra -> RECOVERABLE_STATUSES.contains(lra.lraStatus().get())) .map(l -> JSON.createObjectBuilder() .add("lraId", l.lraId()) - .add("status", l.status().get().name()) + .add("status", l.lraStatus().get().name()) .build() ) .collect(JSON::createArrayBuilder, JsonArrayBuilder::add, JsonArrayBuilder::addAll) @@ -348,7 +381,7 @@ private void get(ServerRequest req, ServerResponse res) { .filter(lra -> lraId.map(id -> lra.lraId().equals(id)).orElse(true)) .map(l -> JSON.createObjectBuilder() .add("lraId", l.lraId()) - .add("status", l.status().get().name()) + .add("status", l.lraStatus().get().name()) .build() ) .collect(JSON::createArrayBuilder, JsonArrayBuilder::add, JsonArrayBuilder::addAll) @@ -369,23 +402,23 @@ private void tick(FixedRateInvocation inv) { if (lra.isReadyToDelete()) { lraPersistentRegistry.remove(lra.lraId()); } else { - if (LRAStatus.Cancelling == lra.status().get()) { + if (LRAStatus.Cancelling == lra.lraStatus().get()) { LOGGER.log(Level.DEBUG, "Recovering {0}", lra.lraId()); lra.cancel(); } - if (LRAStatus.Closing == lra.status().get()) { + if (LRAStatus.Closing == lra.lraStatus().get()) { LOGGER.log(Level.DEBUG, "Recovering {0}", lra.lraId()); lra.close(); } - if (lra.checkTimeout() && lra.status().get().equals(LRAStatus.Active)) { + if (lra.checkTimeout() && lra.lraStatus().get().equals(LRAStatus.Active)) { LOGGER.log(Level.DEBUG, "Timeouting {0} ", lra.lraId()); - lra.timeout(); + lra.triggerTimeout(); } - if (Set.of(LRAStatus.Closed, LRAStatus.Cancelled).contains(lra.status().get())) { + if (Set.of(LRAStatus.Closed, LRAStatus.Cancelled).contains(lra.lraStatus().get())) { // If a participant is unable to complete or compensate immediately or because of a failure // then it must remember the fact (by reporting its' status via the @Status method) // until explicitly told that it can clean up using this @Forget annotation. - LOGGER.log(Level.DEBUG, "Forgetting {0} {1}", new Object[] {lra.status().get(), lra.lraId()}); + LOGGER.log(Level.DEBUG, "Forgetting {0} {1}", new Object[] {lra.lraStatus().get(), lra.lraId()}); lra.tryForget(); lra.tryAfter(); } @@ -394,10 +427,6 @@ private void tick(FixedRateInvocation inv) { completedRecovery.getAndSet(new CompletableFuture<>()).complete(null); } - LazyValue getCoordinatorURL() { - return coordinatorURL; - } - private void nextRecoveryCycle() { try { completedRecovery.get().get(1, TimeUnit.SECONDS); @@ -412,25 +441,6 @@ private URI coordinatorUriWithPath(String additionalPath) { return URI.create(coordinatorURL.get().toASCIIString() + "/" + additionalPath); } - /** - * Create a new Lra coordinator. - * - * @return coordinator - */ - public static CoordinatorService create() { - return builder() - .build(); - } - - /** - * Create a new fluent API builder. - * - * @return a new builder - */ - public static Builder builder() { - return new Builder(); - } - /** * Coordinator builder. */ @@ -439,8 +449,8 @@ public static final class Builder implements io.helidon.common.Builder uriSupplier = () -> URI.create(config.get(COORDINATOR_URL_KEY) - .asString() - .orElse(DEFAULT_COORDINATOR_URL)); + .asString() + .orElse(DEFAULT_COORDINATOR_URL)); /** * Configuration needed for configuring coordinator. @@ -453,18 +463,6 @@ public Builder config(Config config) { return this; } - /** - * Custom persistent registry for saving and loading the state of the coordinator. - * Coordinator is not persistent by default. - * - * @param lraPersistentRegistry custom persistent registry - * @return this builder - */ - public Builder persistentRegistry(LraPersistentRegistry lraPersistentRegistry) { - this.lraPersistentRegistry = lraPersistentRegistry; - return this; - } - /** * Supplier for coordinator url. * For supplying url after we know the port of the started server. @@ -487,5 +485,17 @@ public CoordinatorService build() { } return new CoordinatorService(lraPersistentRegistry, uriSupplier, config); } + + /** + * Custom persistent registry for saving and loading the state of the coordinator. + * Coordinator is not persistent by default. + * + * @param lraPersistentRegistry custom persistent registry + * @return this builder + */ + Builder persistentRegistry(LraPersistentRegistry lraPersistentRegistry) { + this.lraPersistentRegistry = lraPersistentRegistry; + return this; + } } } diff --git a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Lra.java b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Lra.java index cd37fc9feb5..5da5101632e 100644 --- a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Lra.java +++ b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Lra.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,30 +15,10 @@ */ package io.helidon.lra.coordinator; -import java.lang.System.Logger.Level; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import io.helidon.common.LazyValue; -import io.helidon.config.Config; -import io.helidon.http.ClientRequestHeaders; import io.helidon.http.HeaderName; import io.helidon.http.HeaderNames; -import io.helidon.metrics.api.Counter; -import io.helidon.metrics.api.MeterRegistry; -import io.helidon.metrics.api.Metrics; -import io.helidon.metrics.api.Timer; import org.eclipse.microprofile.lra.annotation.LRAStatus; @@ -47,298 +27,66 @@ import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_PARENT_CONTEXT_HEADER; import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_RECOVERY_HEADER; -class Lra { - static final HeaderName LRA_HTTP_CONTEXT_HEADER_NAME = HeaderNames.create(LRA_HTTP_CONTEXT_HEADER); - static final HeaderName LRA_HTTP_ENDED_CONTEXT_HEADER_NAME = HeaderNames.create(LRA_HTTP_ENDED_CONTEXT_HEADER); - static final HeaderName LRA_HTTP_PARENT_CONTEXT_HEADER_NAME = HeaderNames.create(LRA_HTTP_PARENT_CONTEXT_HEADER); - static final HeaderName LRA_HTTP_RECOVERY_HEADER_NAME = HeaderNames.create(LRA_HTTP_RECOVERY_HEADER); - - private static final System.Logger LOGGER = System.getLogger(Lra.class.getName()); - - private final LazyValue coordinatorURL; - private long timeout; - private URI parentId; - private final Set compensatorLinks = Collections.synchronizedSet(new HashSet<>()); - - private final String lraId; - private final Config config; - - private final List children = Collections.synchronizedList(new ArrayList<>()); - - private final List participants = new CopyOnWriteArrayList<>(); - - private final AtomicReference status = new AtomicReference<>(LRAStatus.Active); - - private final Lock lock = new ReentrantLock(); - - private boolean isChild; - private long whenReadyToDelete = 0; - - private final MeterRegistry registry = Metrics.globalRegistry(); - private final Counter lraCtr = registry.getOrCreate(Counter.builder("lractr")); - private final Timer lrfLifeSpanTmr = registry.getOrCreate(Timer.builder("lralifespantmr")); - private final Timer.Sample lraLifeSpanTmrSample = Timer.start(registry); - - Lra(CoordinatorService coordinatorService, String lraUUID, Config config) { - lraId = lraUUID; - this.config = config; - lraCtr.increment(); - coordinatorURL = LazyValue.create(coordinatorService.getCoordinatorURL()); - } - - Lra(CoordinatorService coordinatorService, String lraUUID, URI parentId, Config config) { - lraId = lraUUID; - this.parentId = parentId; - this.config = config; - lraCtr.increment(); - coordinatorURL = LazyValue.create(coordinatorService.getCoordinatorURL()); - } - - String lraId() { - return lraId; - } - - String parentId() { - return Optional.ofNullable(parentId).map(URI::toASCIIString).orElse(null); - } - - boolean isChild() { - return isChild; - } - - void setChild(boolean child) { - isChild = child; - } - - long getTimeout() { - return timeout; - } - - void setStatus(LRAStatus status) { - this.status.set(status); - } - - long getWhenReadyToDelete() { - return this.whenReadyToDelete; - } - - void setWhenReadyToDelete(long whenReadyToDelete) { - this.whenReadyToDelete = whenReadyToDelete; - } - - void setTimeout(long timeout) { - this.timeout = timeout; - } - - List getParticipants() { - return this.participants; - } - - void setupTimeout(long timeLimit) { - if (timeLimit != 0) { - this.timeout = System.currentTimeMillis() + timeLimit; - } else { - this.timeout = 0; - } - } - - boolean checkTimeout() { - return timeout > 0 && timeout < System.currentTimeMillis(); - } - - void addParticipant(String compensatorLink) { - if (compensatorLinks.add(compensatorLink)) { - Participant participant = new Participant(config); - participant.parseCompensatorLinks(compensatorLink); - participants.add(participant); - } - } - - void removeParticipant(String compensatorUrl) { - Set forRemove = participants.stream() - .filter(p -> p.equalCompensatorUris(compensatorUrl)) - .collect(Collectors.toSet()); - forRemove.forEach(participants::remove); - } - - void addChild(Lra lra) { - children.add(lra); - lra.isChild = true; - } - - Consumer headers() { - return headers -> { - headers.add(LRA_HTTP_CONTEXT_HEADER_NAME, lraContextId()); - headers.add(LRA_HTTP_ENDED_CONTEXT_HEADER_NAME, lraContextId()); - Optional.ofNullable(parentId) - .map(URI::toASCIIString) - .ifPresent(s -> headers.add(LRA_HTTP_PARENT_CONTEXT_HEADER_NAME, s)); - headers.add(LRA_HTTP_RECOVERY_HEADER_NAME, lraContextId() + "/recovery"); - }; - } - - void close() { - Set allowedStatuses = Set.of(LRAStatus.Active, LRAStatus.Closing); - if (LRAStatus.Closing != status.updateAndGet(old -> allowedStatuses.contains(old) ? LRAStatus.Closing : old)) { - LOGGER.log(Level.WARNING, "Can't close LRA, it's already " + status.get().name() + " " + this.lraId); - return; - } - lraLifeSpanTmrSample.stop(lrfLifeSpanTmr); - if (lock.tryLock()) { - try { - sendComplete(); - // needs to go before nested close, so we know if nested was already closed - // or not(non closed nested can't get @Forget call) - forgetNested(); - for (Lra nestedLra : children) { - nestedLra.close(); - } - trySendAfterLRA(); - markForDeletion(); - } finally { - lock.unlock(); - } - } - } - - void cancel() { - Set allowedStatuses = Set.of(LRAStatus.Active, LRAStatus.Cancelling); - if (LRAStatus.Cancelling != status.updateAndGet(old -> allowedStatuses.contains(old) ? LRAStatus.Cancelling : old) - && !isChild) { // nested can be compensated even if closed - LOGGER.log(Level.WARNING, "Can't cancel LRA, it's already " + status.get().name() + " " + this.lraId); - return; - } - lraLifeSpanTmrSample.stop(lrfLifeSpanTmr); - for (Lra nestedLra : children) { - nestedLra.cancel(); - } - if (lock.tryLock()) { - try { - sendCancel(); - trySendAfterLRA(); - trySendForgetLRA(); - markForDeletion(); - } finally { - lock.unlock(); - } - } - } - - void timeout() { - for (Lra nestedLra : children) { - if (nestedLra.participants.stream().anyMatch(p -> p.state().isFinal() || p.isListenerOnly())) { - nestedLra.timeout(); - } - } - cancel(); - if (lock.tryLock()) { - try { - trySendAfterLRA(); - } finally { - lock.unlock(); - } - } - } - - private boolean forgetNested() { - for (Lra nestedLra : children) { - //dont do forget not yet closed nested lra - if (nestedLra.status.get() != LRAStatus.Closed) continue; - boolean allDone = true; - for (Participant participant : nestedLra.participants) { - if (participant.getForgetURI().isEmpty() || participant.isForgotten()) continue; - allDone = participant.sendForget(nestedLra) && allDone; - } - if (!allDone) return false; - } - return true; - } - - - boolean tryAfter() { - if (lock.tryLock()) { - try { - return trySendAfterLRA(); - } finally { - lock.unlock(); - } - } - return false; - } - - boolean tryForget() { - if (lock.tryLock()) { - try { - return trySendForgetLRA(); - } finally { - lock.unlock(); - } - } - return false; - } - - private boolean trySendForgetLRA() { - boolean allFinished = true; - for (Participant participant : participants) { - if (participant.getForgetURI().isEmpty() || participant.isForgotten()) continue; - if (Set.of( - Participant.Status.FAILED_TO_COMPLETE, - Participant.Status.FAILED_TO_COMPENSATE - ).contains(participant.state())) { - allFinished = participant.sendForget(this) && allFinished; - } - } - return allFinished; - } - - AtomicReference status() { - return status; - } - - String lraContextId() { - return coordinatorURL.get().toASCIIString() + "/" + lraId; - } - - private void sendComplete() { - boolean allClosed = true; - for (Participant participant : participants) { - if (participant.isInEndStateOrListenerOnly() && !isChild) { - continue; - } - allClosed = participant.sendComplete(this) && allClosed; - } - if (allClosed) { - this.status().compareAndSet(LRAStatus.Closing, LRAStatus.Closed); - } - } - - private void sendCancel() { - boolean allDone = true; - for (Participant participant : participants) { - if (participant.isInEndStateOrListenerOnly() && !isChild) { - continue; - } - allDone = participant.sendCancel(this) && allDone; - } - if (allDone) { - this.status().compareAndSet(LRAStatus.Cancelling, LRAStatus.Cancelled); - } - } - - private boolean trySendAfterLRA() { - boolean allSent = true; - for (Participant participant : participants) { - allSent = participant.trySendAfterLRA(this) && allSent; - } - return allSent; - } - - boolean isReadyToDelete() { - return whenReadyToDelete != 0 && whenReadyToDelete < System.currentTimeMillis(); - } - - void markForDeletion() { - // delete after 10 minutes - whenReadyToDelete = (10 * 60 * 1000) + System.currentTimeMillis(); - } +/** + * Long Running Action managed by coordinator. + */ +public interface Lra { + /** + * LRA header name. + */ + HeaderName LRA_HTTP_CONTEXT_HEADER_NAME = HeaderNames.create(LRA_HTTP_CONTEXT_HEADER); + /** + * LRA ended header name. + */ + HeaderName LRA_HTTP_ENDED_CONTEXT_HEADER_NAME = HeaderNames.create(LRA_HTTP_ENDED_CONTEXT_HEADER); + /** + * LRA parent header name. + */ + HeaderName LRA_HTTP_PARENT_CONTEXT_HEADER_NAME = HeaderNames.create(LRA_HTTP_PARENT_CONTEXT_HEADER); + /** + * LRA recovery header name. + */ + HeaderName LRA_HTTP_RECOVERY_HEADER_NAME = HeaderNames.create(LRA_HTTP_RECOVERY_HEADER); + + /** + * ID of the LRA used by this coordinator. + * + * @return lraId without coordinator URI prefix + */ + String lraId(); + + /** + * LRA ID of the parent LRA if this LRA has any. + * + * @return id of parent LRA or null + */ + String parentId(); + + /** + * Returns true if this LRA has parent LRA. + * + * @return true if LRA has parent + */ + boolean isChild(); + + /** + * Returns exact time when will LRA timeout in millis. + * + * @return time of timeout in millis + */ + long timeout(); + + /** + * All participants enrolled in this LRA. + * + * @return list of participants enrolled in this LRA + */ + List participants(); + + /** + * Status of this LRA. + * + * @return status + */ + LRAStatus status(); } diff --git a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraDatabasePersistentRegistry.java b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraDatabasePersistentRegistry.java index c38ef83539d..b3b270a54cb 100644 --- a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraDatabasePersistentRegistry.java +++ b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraDatabasePersistentRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,9 @@ import java.net.URI; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -35,46 +35,46 @@ class LraDatabasePersistentRegistry implements LraPersistentRegistry { - private final Map lraMap = Collections.synchronizedMap(new HashMap<>()); - private final Config config; - private final DbClient dbClient; private static final Pattern LRA_ID_PATTERN = Pattern.compile(".*/([^/?]+).*"); + private final Map lraMap = Collections.synchronizedMap(new HashMap<>()); + private final Config config; + private final Supplier dbClient; + private final Boolean persistent; LraDatabasePersistentRegistry(Config config) { this.config = config; - this.dbClient = DbClient.builder() - .config(config.get("db")) - .build(); - - DbTransaction tx = dbClient.transaction(); - tx.namedDml("create-lra-table"); - tx.namedDml("create-participant-table"); - tx.commit(); - } - - @Override - public Lra get(String lraId) { - return lraMap.get(lraId); - } - - @Override - public void put(String key, Lra lra) { - lraMap.put(key, lra); - } - - @Override - public void remove(String key) { - lraMap.remove(key); + this.persistent = config.get("persistence").asBoolean().orElse(true); + if (persistent) { + this.dbClient = () -> DbClient.builder() + .config(config.get("db")) + .build(); + DbTransaction tx = dbClient.get().transaction(); + tx.namedDml("create-lra-table"); + tx.namedDml("create-participant-table"); + tx.commit(); + } else { + this.dbClient = () -> { + throw new IllegalStateException("Persistence is not enabled"); + }; + } } - @Override - public Stream stream() { - return lraMap.values().stream(); + static String parseLRAId(String lraUri) { + Matcher m = LRA_ID_PATTERN.matcher(lraUri); + if (!m.matches()) { + //LRA uri format + throw new RuntimeException("Error when parsing lraUri: " + lraUri); + } + return m.group(1); } @Override public void load(CoordinatorService coordinatorService) { - DbExecute tx = dbClient.execute(); + if (!persistent) { + return; + } + + DbExecute tx = dbClient.get().execute(); tx.namedQuery("load").forEach(row -> { String lraId = row.column("ID").get(String.class); String parentId = row.column("PARENT_ID").get(String.class); @@ -96,11 +96,11 @@ public void load(CoordinatorService coordinatorService) { Integer remainingCloseAttempts = row.column("REMAINING_CLOSE_ATTEMPTS").get(Integer.class); Integer remainingAfterAttempts = row.column("REMAINING_AFTER_ATTEMPTS").get(Integer.class); - Lra lra = lraMap.get(lraId); + LraImpl lra = lraMap.get(lraId); if (lra == null) { - lra = new Lra(coordinatorService, lraId, - Optional.ofNullable(parentId).map(URI::create).orElse(null), - config); + lra = new LraImpl(coordinatorService, lraId, + Optional.ofNullable(parentId).map(URI::create).orElse(null), + config); lra.setTimeout(timeout); lra.setStatus(LRAStatus.valueOf(lraStatus)); lra.setChild(isChild); @@ -108,22 +108,21 @@ public void load(CoordinatorService coordinatorService) { } if (participantStatus != null) { - Participant participant = new Participant(config); - participant.setCompleteURI(Optional.ofNullable(completeLink).map(URI::create).orElse(null)); - participant.setCompensateURI(Optional.ofNullable(compensateLink).map(URI::create).orElse(null)); - participant.setAfterURI(Optional.ofNullable(afterLink).map(URI::create).orElse(null)); - participant.setForgetURI(Optional.ofNullable(forgetLink).map(URI::create).orElse(null)); - participant.setStatusURI(Optional.ofNullable(statusLink).map(URI::create).orElse(null)); - participant.setStatus(Participant.Status.valueOf(participantStatus)); - participant.setCompensateStatus(Participant.CompensateStatus.valueOf(compensateStatus)); - participant.setForgetStatus(Participant.ForgetStatus.valueOf(forgetStatus)); - participant.setAfterLraStatus(Participant.AfterLraStatus.valueOf(afterStatus)); - participant.setSendingStatus(Participant.SendingStatus.valueOf(sendingStatus)); - participant.setRemainingCloseAttempts(remainingCloseAttempts); - participant.setRemainingAfterAttempts(remainingAfterAttempts); - - List participants = lra.getParticipants(); - participants.add(participant); + ParticipantImpl participant = new ParticipantImpl(config); + participant.completeURI(Optional.ofNullable(completeLink).map(URI::create).orElse(null)); + participant.compensateURI(Optional.ofNullable(compensateLink).map(URI::create).orElse(null)); + participant.afterURI(Optional.ofNullable(afterLink).map(URI::create).orElse(null)); + participant.forgetURI(Optional.ofNullable(forgetLink).map(URI::create).orElse(null)); + participant.statusURI(Optional.ofNullable(statusLink).map(URI::create).orElse(null)); + participant.status(ParticipantImpl.Status.valueOf(participantStatus)); + participant.compensateStatus(ParticipantImpl.CompensateStatus.valueOf(compensateStatus)); + participant.forgetStatus(ParticipantImpl.ForgetStatus.valueOf(forgetStatus)); + participant.afterLraStatus(ParticipantImpl.AfterLraStatus.valueOf(afterStatus)); + participant.sendingStatus(ParticipantImpl.SendingStatus.valueOf(sendingStatus)); + participant.remainingCloseAttempts(remainingCloseAttempts); + participant.remainingAfterAttempts(remainingAfterAttempts); + + lra.addParticipant(participant); } lraMap.put(lraId, lra); }); @@ -139,44 +138,68 @@ public void load(CoordinatorService coordinatorService) { @Override public void save() { - DbTransaction tx = dbClient.transaction(); + if (!persistent) { + return; + } + + DbTransaction tx = dbClient.get().transaction(); cleanUp(tx); saveAll(tx); tx.commit(); } + @Override + public LraImpl get(String lraId) { + return lraMap.get(lraId); + } + + @Override + public void put(String key, LraImpl lra) { + lraMap.put(key, lra); + } + + @Override + public void remove(String key) { + lraMap.remove(key); + } + + @Override + public Stream stream() { + return lraMap.values().stream(); + } + private void saveAll(DbTransaction tx) { lraMap.values().forEach(lra -> insertLra(tx, lra)); } - private void insertLra(DbTransaction tx, Lra lra) { + private void insertLra(DbTransaction tx, LraImpl lra) { tx.namedInsert("insert-lra", - lra.lraId(), - lra.parentId(), - lra.getTimeout(), - lra.status().get().name(), - lra.isChild(), - lra.getWhenReadyToDelete()); + lra.lraId(), + lra.parentId(), + lra.timeout(), + lra.lraStatus().get().name(), + lra.isChild(), + lra.getWhenReadyToDelete()); // save all participants of the lra - lra.getParticipants().forEach(participant -> insertParticipant(tx, lra, participant)); + lra.participants().forEach(participant -> insertParticipant(tx, lra, (ParticipantImpl) participant)); } - private void insertParticipant(DbTransaction tx, Lra lra, Participant p) { + private void insertParticipant(DbTransaction tx, LraImpl lra, ParticipantImpl p) { tx.namedInsert("insert-participant", - lra.lraId(), - p.state().name(), - p.getCompensateStatus().name(), - p.getForgetStatus().name(), - p.getAfterLraStatus().name(), - p.getSendingStatus().name(), - p.getRemainingCloseAttempts(), - p.getRemainingAfterAttempts(), - p.getCompleteURI().map(URI::toASCIIString).orElse(null), - p.getCompensateURI().map(URI::toASCIIString).orElse(null), - p.getAfterURI().map(URI::toASCIIString).orElse(null), - p.getForgetURI().map(URI::toASCIIString).orElse(null), - p.getStatusURI().map(URI::toASCIIString).orElse(null)); + lra.lraId(), + p.state().name(), + p.compensateStatus().name(), + p.forgetStatus().name(), + p.afterLraStatus().name(), + p.sendingStatus().name(), + p.remainingCloseAttempts(), + p.remainingAfterAttempts(), + p.completeURI().map(URI::toASCIIString).orElse(null), + p.compensateURI().map(URI::toASCIIString).orElse(null), + p.afterURI().map(URI::toASCIIString).orElse(null), + p.forgetURI().map(URI::toASCIIString).orElse(null), + p.statusURI().map(URI::toASCIIString).orElse(null)); } private void cleanUp(DbTransaction tx) { @@ -184,13 +207,4 @@ private void cleanUp(DbTransaction tx) { tx.namedDelete("delete-all-participants"); } - static String parseLRAId(String lraUri) { - Matcher m = LRA_ID_PATTERN.matcher(lraUri); - if (!m.matches()) { - //LRA uri format - throw new RuntimeException("Error when parsing lraUri: " + lraUri); - } - return m.group(1); - } - } diff --git a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraImpl.java b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraImpl.java new file mode 100644 index 00000000000..abc47abf143 --- /dev/null +++ b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraImpl.java @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.lra.coordinator; + +import java.lang.System.Logger.Level; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import io.helidon.common.LazyValue; +import io.helidon.config.Config; +import io.helidon.http.ClientRequestHeaders; +import io.helidon.metrics.api.Counter; +import io.helidon.metrics.api.MeterRegistry; +import io.helidon.metrics.api.Metrics; +import io.helidon.metrics.api.Timer; + +import org.eclipse.microprofile.lra.annotation.LRAStatus; + +class LraImpl implements Lra { + + private static final System.Logger LOGGER = System.getLogger(LraImpl.class.getName()); + + private final LazyValue coordinatorURL; + private final Set compensatorLinks = Collections.synchronizedSet(new HashSet<>()); + private final String lraId; + private final Config config; + private final List children = Collections.synchronizedList(new ArrayList<>()); + private final List participants = new CopyOnWriteArrayList<>(); + private final AtomicReference status = new AtomicReference<>(LRAStatus.Active); + private final Lock lock = new ReentrantLock(); + private final MeterRegistry registry = Metrics.globalRegistry(); + private final Counter lraCtr = registry.getOrCreate(Counter.builder("lractr")); + private final Timer lrfLifeSpanTmr = registry.getOrCreate(Timer.builder("lralifespantmr")); + private final Timer.Sample lraLifeSpanTmrSample = Timer.start(registry); + private long timeout; + private URI parentId; + private boolean isChild; + private long whenReadyToDelete = 0; + + LraImpl(CoordinatorService coordinatorService, String lraUUID, Config config) { + lraId = lraUUID; + this.config = config; + lraCtr.increment(); + coordinatorURL = LazyValue.create(coordinatorService.coordinatorURL()); + } + + LraImpl(CoordinatorService coordinatorService, String lraUUID, URI parentId, Config config) { + lraId = lraUUID; + this.parentId = parentId; + this.config = config; + lraCtr.increment(); + coordinatorURL = LazyValue.create(coordinatorService.coordinatorURL()); + } + + @Override + public String lraId() { + return lraId; + } + + @Override + public String parentId() { + return Optional.ofNullable(parentId).map(URI::toASCIIString).orElse(null); + } + + @Override + public boolean isChild() { + return isChild; + } + + void setChild(boolean child) { + isChild = child; + } + + @Override + public long timeout() { + return timeout; + } + + void setTimeout(long timeout) { + this.timeout = timeout; + } + + @Override + public List participants() { + return this.participants.stream().map(Participant.class::cast).toList(); + } + + @Override + public LRAStatus status() { + return lraStatus().get(); + } + + void setStatus(LRAStatus status) { + this.status.set(status); + } + + long getWhenReadyToDelete() { + return this.whenReadyToDelete; + } + + void setWhenReadyToDelete(long whenReadyToDelete) { + this.whenReadyToDelete = whenReadyToDelete; + } + + void addParticipant(ParticipantImpl participant) { + this.participants.add(participant); + } + + void setupTimeout(long timeLimit) { + if (timeLimit != 0) { + this.timeout = System.currentTimeMillis() + timeLimit; + } else { + this.timeout = 0; + } + } + + boolean checkTimeout() { + return timeout > 0 && timeout < System.currentTimeMillis(); + } + + void addParticipant(String compensatorLink) { + if (compensatorLinks.add(compensatorLink)) { + ParticipantImpl participant = new ParticipantImpl(config); + participant.parseCompensatorLinks(compensatorLink); + participants.add(participant); + } + } + + void removeParticipant(String compensatorUrl) { + Set forRemove = participants.stream() + .filter(p -> p.equalCompensatorUris(compensatorUrl)) + .collect(Collectors.toSet()); + forRemove.forEach(participants::remove); + } + + void addChild(LraImpl lra) { + children.add(lra); + lra.isChild = true; + } + + Consumer headers() { + return headers -> { + headers.add(LRA_HTTP_CONTEXT_HEADER_NAME, lraContextId()); + headers.add(LRA_HTTP_ENDED_CONTEXT_HEADER_NAME, lraContextId()); + Optional.ofNullable(parentId) + .map(URI::toASCIIString) + .ifPresent(s -> headers.add(LRA_HTTP_PARENT_CONTEXT_HEADER_NAME, s)); + headers.add(LRA_HTTP_RECOVERY_HEADER_NAME, lraContextId() + "/recovery"); + }; + } + + void close() { + Set allowedStatuses = Set.of(LRAStatus.Active, LRAStatus.Closing); + if (LRAStatus.Closing != status.updateAndGet(old -> allowedStatuses.contains(old) ? LRAStatus.Closing : old)) { + LOGGER.log(Level.WARNING, "Can't close LRA, it's already " + status.get().name() + " " + this.lraId); + return; + } + lraLifeSpanTmrSample.stop(lrfLifeSpanTmr); + if (lock.tryLock()) { + try { + sendComplete(); + // needs to go before nested close, so we know if nested was already closed + // or not(non closed nested can't get @Forget call) + forgetNested(); + for (LraImpl nestedLra : children) { + nestedLra.close(); + } + trySendAfterLRA(); + markForDeletion(); + } finally { + lock.unlock(); + } + } + } + + void cancel() { + Set allowedStatuses = Set.of(LRAStatus.Active, LRAStatus.Cancelling); + if (LRAStatus.Cancelling != status.updateAndGet(old -> allowedStatuses.contains(old) ? LRAStatus.Cancelling : old) + && !isChild) { // nested can be compensated even if closed + LOGGER.log(Level.WARNING, "Can't cancel LRA, it's already " + status.get().name() + " " + this.lraId); + return; + } + lraLifeSpanTmrSample.stop(lrfLifeSpanTmr); + for (LraImpl nestedLra : children) { + nestedLra.cancel(); + } + if (lock.tryLock()) { + try { + sendCancel(); + trySendAfterLRA(); + trySendForgetLRA(); + markForDeletion(); + } finally { + lock.unlock(); + } + } + } + + void triggerTimeout() { + for (LraImpl nestedLra : children) { + if (nestedLra.participants.stream().anyMatch(p -> p.state().isFinal() || p.isListenerOnly())) { + nestedLra.triggerTimeout(); + } + } + cancel(); + if (lock.tryLock()) { + try { + trySendAfterLRA(); + } finally { + lock.unlock(); + } + } + } + + boolean tryAfter() { + if (lock.tryLock()) { + try { + return trySendAfterLRA(); + } finally { + lock.unlock(); + } + } + return false; + } + + boolean tryForget() { + if (lock.tryLock()) { + try { + return trySendForgetLRA(); + } finally { + lock.unlock(); + } + } + return false; + } + + AtomicReference lraStatus() { + return status; + } + + String lraContextId() { + return coordinatorURL.get().toASCIIString() + "/" + lraId; + } + + boolean isReadyToDelete() { + return whenReadyToDelete != 0 && whenReadyToDelete < System.currentTimeMillis(); + } + + void markForDeletion() { + // delete after 10 minutes + whenReadyToDelete = (10 * 60 * 1000) + System.currentTimeMillis(); + } + + private boolean forgetNested() { + for (LraImpl nestedLra : children) { + //don't do forget not yet closed nested lra + if (nestedLra.status.get() != LRAStatus.Closed) { + continue; + } + boolean allDone = true; + for (ParticipantImpl participant : nestedLra.participants) { + if (participant.forgetURI().isEmpty() || participant.isForgotten()) { + continue; + } + allDone = participant.sendForget(nestedLra) && allDone; + } + if (!allDone) { + return false; + } + } + return true; + } + + private boolean trySendForgetLRA() { + boolean allFinished = true; + for (ParticipantImpl participant : participants) { + if (participant.forgetURI().isEmpty() || participant.isForgotten()) { + continue; + } + if (Set.of( + ParticipantImpl.Status.FAILED_TO_COMPLETE, + ParticipantImpl.Status.FAILED_TO_COMPENSATE + ).contains(participant.state())) { + allFinished = participant.sendForget(this) && allFinished; + } + } + return allFinished; + } + + private void sendComplete() { + boolean allClosed = true; + for (ParticipantImpl participant : participants) { + if (participant.isInEndStateOrListenerOnly() && !isChild) { + continue; + } + allClosed = participant.sendComplete(this) && allClosed; + } + if (allClosed) { + this.lraStatus().compareAndSet(LRAStatus.Closing, LRAStatus.Closed); + } + } + + private void sendCancel() { + boolean allDone = true; + for (ParticipantImpl participant : participants) { + if (participant.isInEndStateOrListenerOnly() && !isChild) { + continue; + } + allDone = participant.sendCancel(this) && allDone; + } + if (allDone) { + this.lraStatus().compareAndSet(LRAStatus.Cancelling, LRAStatus.Cancelled); + } + } + + private boolean trySendAfterLRA() { + boolean allSent = true; + for (ParticipantImpl participant : participants) { + allSent = participant.trySendAfterLRA(this) && allSent; + } + return allSent; + } +} diff --git a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraPersistentRegistry.java b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraPersistentRegistry.java index d0aae2c8a2e..1d765a5f561 100644 --- a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraPersistentRegistry.java +++ b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/LraPersistentRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,7 @@ interface LraPersistentRegistry { * @param lraId to look for * @return lra if exist */ - Lra get(String lraId); + LraImpl get(String lraId); /** * Add new Lra. @@ -47,7 +47,7 @@ interface LraPersistentRegistry { * @param lraId id of new lra * @param lra Lra */ - void put(String lraId, Lra lra); + void put(String lraId, LraImpl lra); /** * Remove lra by id. @@ -61,6 +61,6 @@ interface LraPersistentRegistry { * * @return stream of all the Lras */ - Stream stream(); + Stream stream(); } diff --git a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Participant.java b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Participant.java index 8ce4f5a3309..2514923fc17 100644 --- a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Participant.java +++ b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/Participant.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,577 +13,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.helidon.lra.coordinator; -import java.lang.System.Logger.Level; import java.net.URI; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import io.helidon.config.Config; -import io.helidon.webclient.api.HttpClientResponse; -import io.helidon.webclient.api.WebClient; - -import org.eclipse.microprofile.lra.annotation.LRAStatus; -import org.eclipse.microprofile.lra.annotation.ParticipantStatus; - -import static io.helidon.lra.coordinator.Lra.LRA_HTTP_CONTEXT_HEADER_NAME; -import static io.helidon.lra.coordinator.Lra.LRA_HTTP_ENDED_CONTEXT_HEADER_NAME; -import static io.helidon.lra.coordinator.Lra.LRA_HTTP_RECOVERY_HEADER_NAME; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Active; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Compensated; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Compensating; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Completed; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Completing; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.FailedToCompensate; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.FailedToComplete; -import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.valueOf; - -class Participant { - - private static final int RETRY_CNT = 60; - private static final int SYNCHRONOUS_RETRY_CNT = 5; - - private static final System.Logger LOGGER = System.getLogger(Participant.class.getName()); - private final AtomicReference compensateCalled = new AtomicReference<>(CompensateStatus.NOT_SENT); - private final AtomicReference forgetCalled = new AtomicReference<>(ForgetStatus.NOT_SENT); - private final AtomicReference afterLRACalled = new AtomicReference<>(AfterLraStatus.NOT_SENT); - private final AtomicReference sendingStatus = new AtomicReference<>(SendingStatus.NOT_SENDING); - private final AtomicInteger remainingCloseAttempts = new AtomicInteger(RETRY_CNT); - private final AtomicInteger remainingAfterLraAttempts = new AtomicInteger(RETRY_CNT); - - private final AtomicReference status = new AtomicReference<>(Status.ACTIVE); - private final Map compensatorLinks = new HashMap<>(); - private final long timeout; - private final WebClient webClient = WebClient.builder().build(); - - enum Status { - ACTIVE(Active, null, null, false, Set.of(Completing, Compensating)), - - COMPENSATED(Compensated, null, null, true, Set.of(Compensated)), - COMPLETED(Completed, null, null, true, Set.of(Completed)), - FAILED_TO_COMPENSATE(FailedToCompensate, null, null, true, Set.of()), - FAILED_TO_COMPLETE(FailedToComplete, null, null, true, Set.of()), - - CLIENT_COMPENSATING(Compensating, COMPENSATED, FAILED_TO_COMPENSATE, false, - Set.of(Active, Compensated, FailedToCompensate)), - CLIENT_COMPLETING(Completing, COMPLETED, FAILED_TO_COMPLETE, false, Set.of(Active, Completed, FailedToComplete)), - COMPENSATING(Compensating, COMPENSATED, FAILED_TO_COMPENSATE, false, Set.of(Active, Compensated, FailedToCompensate)), - COMPLETING(Completing, COMPLETED, FAILED_TO_COMPLETE, false, Set.of(Active, Completed, FailedToComplete)); - - private final ParticipantStatus participantStatus; - private final Status successFinalStatus; - private final Status failedFinalStatus; - private final boolean finalState; - private final Set validNextStates; - - Status(ParticipantStatus participantStatus, - Status successFinalStatus, - Status failedFinalStatus, - boolean finalState, - Set validNextStates) { - this.participantStatus = participantStatus; - this.successFinalStatus = successFinalStatus; - this.failedFinalStatus = failedFinalStatus; - this.finalState = finalState; - this.validNextStates = validNextStates; - } - - ParticipantStatus participantStatus() { - return participantStatus; - } - - boolean isFinal() { - return finalState; - } - - boolean validateNextStatus(ParticipantStatus participantStatus) { - return validNextStates.contains(participantStatus); - } - - Optional successFinalStatus() { - return Optional.ofNullable(successFinalStatus.participantStatus()); - } - - Optional failedFinalStatus() { - return Optional.ofNullable(failedFinalStatus.participantStatus); - } - } - enum SendingStatus { - SENDING, NOT_SENDING; - } - - enum AfterLraStatus { - NOT_SENT, SENDING, SENT; - } - - enum ForgetStatus { - NOT_SENT, SENDING, SENT; - } - - enum CompensateStatus { - NOT_SENT, SENDING, SENT; - } - - Participant(Config config) { - timeout = config.get("timeout") - .asLong() - .orElse(500L); - } - - void parseCompensatorLinks(String compensatorLinks) { - Stream.of(compensatorLinks.split(",")) - .filter(s -> !s.isBlank()) - .map(Link::valueOf) - .forEach(link -> this.compensatorLinks.put(link.rel(), link.uri())); - } - - Optional getCompensatorLink(String rel) { - return Optional.ofNullable(compensatorLinks.get(rel)); - } +/** + * LRA participant managed by coordinator. + */ +public interface Participant { /** * Invoked when closed 200, 202, 409, 410. + * + * @return optional uri */ - Optional getCompleteURI() { - return getCompensatorLink("complete"); - } - - void setCompleteURI(URI completeURI) { - compensatorLinks.put("complete", completeURI); - } + Optional completeURI(); /** * Invoked when cancelled 200, 202, 409, 410. + * + * @return optional uri */ - Optional getCompensateURI() { - return getCompensatorLink("compensate"); - } - - void setCompensateURI(URI compensateURI) { - compensatorLinks.put("compensate", compensateURI); - } + Optional compensateURI(); /** * Invoked when finalized 200. + * + * @return optional uri */ - Optional getAfterURI() { - return getCompensatorLink("after"); - } - - void setAfterURI(URI afterURI) { - compensatorLinks.put("after", afterURI); - } + Optional afterURI(); /** * Invoked when cleaning up 200, 410. + * + * @return optional uri */ - Optional getForgetURI() { - return getCompensatorLink("forget"); - } - - void setForgetURI(URI forgetURI) { - compensatorLinks.put("forget", forgetURI); - } + Optional forgetURI(); /** * Directly updates status of participant 200, 202, 410. + * + * @return optional uri */ - Optional getStatusURI() { - return getCompensatorLink("status"); - } - - void setStatusURI(URI statusURI) { - compensatorLinks.put("status", statusURI); - } - - CompensateStatus getCompensateStatus() { - return this.compensateCalled.get(); - } - - void setCompensateStatus(CompensateStatus compensateStatus) { - this.compensateCalled.set(compensateStatus); - } - - void setStatus(Status status) { - this.status.set(status); - } - - ForgetStatus getForgetStatus() { - return this.forgetCalled.get(); - } - - void setForgetStatus(ForgetStatus forgetStatus) { - this.forgetCalled.set(forgetStatus); - } - - AfterLraStatus getAfterLraStatus() { - return this.afterLRACalled.get(); - } - - void setAfterLraStatus(AfterLraStatus afterLraStatus) { - this.afterLRACalled.set(afterLraStatus); - } - - SendingStatus getSendingStatus() { - return this.sendingStatus.get(); - } - - void setSendingStatus(SendingStatus sendingStatus) { - this.sendingStatus.set(sendingStatus); - } - - int getRemainingCloseAttempts() { - return this.remainingCloseAttempts.get(); - } - - void setRemainingCloseAttempts(int remainingCloseAttempts) { - this.remainingCloseAttempts.set(remainingCloseAttempts); - } - - int getRemainingAfterAttempts() { - return this.remainingAfterLraAttempts.get(); - } - - void setRemainingAfterAttempts(int remainingAfterAttempts) { - this.remainingAfterLraAttempts.set(remainingAfterAttempts); - } - - Status state() { - return status.get(); - } - - boolean isForgotten() { - return forgetCalled.get() == ForgetStatus.SENT; - } - - boolean isListenerOnly() { - return getCompleteURI().isEmpty() && getCompensateURI().isEmpty(); - } - - boolean isInEndStateOrListenerOnly() { - return isListenerOnly() || status.get().isFinal(); - } - - boolean sendCancel(Lra lra) { - Optional endpointURI = getCompensateURI(); - for (AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < SYNCHRONOUS_RETRY_CNT;) { - if (!sendingStatus.compareAndSet(SendingStatus.NOT_SENDING, SendingStatus.SENDING)) return false; - if (!compensateCalled.compareAndSet(CompensateStatus.NOT_SENT, CompensateStatus.SENDING)) return false; - LOGGER.log(Level.DEBUG, () -> "Sending compensate, sync retry: " + i.get() - + ", status: " + status.get().name() - + " statusUri: " + getStatusURI().map(URI::toASCIIString).orElse(null)); - HttpClientResponse response = null; - try { - // call for client status only on retries and when status uri is known - if (!status.get().equals(Status.ACTIVE) && getStatusURI().isPresent()) { - // If the participant does not support idempotency then it MUST be able to report its status - // by annotating one of the methods with the @Status annotation which should report the status - // in case we can't retrieve status from participant just retry n times - ParticipantStatus reportedClientStatus = retrieveStatus(lra, Compensating).orElse(null); - if (reportedClientStatus == Compensated) { - LOGGER.log(Level.INFO, "Participant reports it is compensated."); - status.set(Status.COMPENSATED); - return true; - } else if (reportedClientStatus == FailedToCompensate) { - LOGGER.log(Level.INFO, "Participant reports it failed to compensate."); - status.set(Status.FAILED_TO_COMPENSATE); - return true; - } else if (reportedClientStatus == Active) { - // last call didn't reach participant, try call again - } else if (reportedClientStatus == Completed && lra.isChild()) { - // completed participant can be compensated again in case of nested tx - } else if (reportedClientStatus == Compensating) { - LOGGER.log(Level.INFO, "Participant reports it is still compensating."); - status.set(Status.CLIENT_COMPENSATING); - return false; - } else if (remainingCloseAttempts.decrementAndGet() <= 0) { - LOGGER.log(Level.INFO, "Participant didnt report final status after {0} status call retries.", - new Object[] {RETRY_CNT}); - status.set(Status.FAILED_TO_COMPENSATE); - return true; - } else { - // Unknown status, lets try in next recovery cycle - LOGGER.log(Level.INFO, "Unknown status of " + lra.lraId()); - return false; - } - } - - response = webClient.put() - .uri(endpointURI.get()) - .headers(lra.headers()) - .submit(LRAStatus.Cancelled.name()); - - // When timeout occur we loose track of the participant status - // next retry will attempt to retrieve participant status if status uri is available - - switch (response.status().code()) { - // complete or compensated - case 200: - case 410: - LOGGER.log(Level.INFO, "Compensated participant of LRA {0} {1}", - new Object[] {lra.lraId(), this.getCompensateURI()}); - status.set(Status.COMPENSATED); - compensateCalled.set(CompensateStatus.SENT); - return true; - - // retryable - case 202: - // Still compensating, check with @Status later - this.status.set(Status.CLIENT_COMPENSATING); - return false; - case 409: - case 404: - case 503: - default: - throw new Exception(response.status().code() + " " + response.status().reasonPhrase()); - } - - } catch (Exception e) { - LOGGER.log(Level.WARNING, - () -> "Can't reach participant's compensate endpoint: " - + endpointURI.map(URI::toASCIIString).orElse("unknown"), e); - if (remainingCloseAttempts.decrementAndGet() <= 0) { - LOGGER.log(Level.WARNING, "Failed to compensate participant of LRA {0} {1} {2}", - new Object[] {lra.lraId(), this.getCompensateURI(), e.getMessage()}); - status.set(Status.FAILED_TO_COMPENSATE); - } else { - status.set(Status.COMPENSATING); - } - - } finally { - Optional.ofNullable(response).ifPresent(HttpClientResponse::close); - sendingStatus.set(SendingStatus.NOT_SENDING); - compensateCalled.compareAndSet(CompensateStatus.SENDING, CompensateStatus.NOT_SENT); - } - } - return false; - } - - boolean sendComplete(Lra lra) { - Optional endpointURI = getCompleteURI(); - for (AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < SYNCHRONOUS_RETRY_CNT;) { - if (!sendingStatus.compareAndSet(SendingStatus.NOT_SENDING, SendingStatus.SENDING)) return false; - LOGGER.log(Level.DEBUG, () -> "Sending complete, sync retry: " + i.get() - + ", status: " + status.get().name() - + " statusUri: " + getStatusURI().map(URI::toASCIIString).orElse(null)); - HttpClientResponse response = null; - try { - if (status.get().isFinal()) { - return true; - // call for client status only on retries and when status uri is known - } else if (!status.get().equals(Status.ACTIVE) && getStatusURI().isPresent()) { - // If the participant does not support idempotency then it MUST be able to report its status - // by annotating one of the methods with the @Status annotation which should report the status - // in case we can't retrieve status from participant just retry n times - ParticipantStatus reportedClientStatus = retrieveStatus(lra, Completing).orElse(null); - if (reportedClientStatus == Completed) { - LOGGER.log(Level.INFO, "Participant reports it is completed."); - status.set(Status.COMPLETED); - return true; - } else if (reportedClientStatus == FailedToComplete) { - LOGGER.log(Level.INFO, "Participant reports it failed to complete."); - status.set(Status.FAILED_TO_COMPLETE); - return true; - } else if (reportedClientStatus == Active) { - // last call didn't reach participant, try call again - } else if (reportedClientStatus == Completing) { - LOGGER.log(Level.INFO, "Participant reports it is still completing."); - status.set(Status.CLIENT_COMPLETING); - return false; - } else if (remainingCloseAttempts.decrementAndGet() <= 0) { - LOGGER.log(Level.INFO, "Participant didnt report final status after {0} status call retries.", - new Object[] {RETRY_CNT}); - status.set(Status.FAILED_TO_COMPLETE); - return true; - } else { - // Unknown status, lets try in next recovery cycle - return false; - } - } - response = webClient.put() - .uri(endpointURI.get()) - .headers(lra.headers()) - .submit(LRAStatus.Closed.name()); - // When timeout occur we loose track of the participant status - // next retry will attempt to retrieve participant status if status uri is available - - switch (response.status().code()) { - // complete or compensated - case 200: - case 410: - status.set(Status.COMPLETED); - return true; - - // retryable - case 202: - // Still completing, check with @Status later - this.status.set(Status.CLIENT_COMPLETING); - return false; - case 409: - case 404: - case 503: - default: - throw new Exception(response.status().code() + " " + response.status().reasonPhrase()); - } - - } catch (Exception e) { - LOGGER.log(Level.WARNING, - () -> "Can't reach participant's complete endpoint: " + endpointURI.map(URI::toASCIIString) - .orElse("unknown"), e); - if (remainingCloseAttempts.decrementAndGet() <= 0) { - LOGGER.log(Level.WARNING, "Failed to complete participant of LRA {0} {1} {2}", - new Object[]{lra.lraId(), - this.getCompleteURI(), e.getMessage()}); - status.set(Status.FAILED_TO_COMPLETE); - } else { - status.set(Status.COMPLETING); - } - } finally { - Optional.ofNullable(response).ifPresent(HttpClientResponse::close); - sendingStatus.set(SendingStatus.NOT_SENDING); - } - } - return false; - } - - boolean trySendAfterLRA(Lra lra) { - for (int i = 0; i < SYNCHRONOUS_RETRY_CNT; i++) { - // Participant in right state - if (!isInEndStateOrListenerOnly()) return false; - // LRA in right state - if (!(Set.of(LRAStatus.Closed, LRAStatus.Cancelled).contains(lra.status().get()))) return false; - - HttpClientResponse response = null; - try { - Optional afterURI = getAfterURI(); - if (afterURI.isPresent() && afterLRACalled.compareAndSet(AfterLraStatus.NOT_SENT, AfterLraStatus.SENDING)) { - response = webClient.put() - .uri(afterURI.get()) - .headers(lra.headers()) - .submit(lra.status().get().name()); - - if (response.status().code() == 200) { - afterLRACalled.set(AfterLraStatus.SENT); - } else if (remainingAfterLraAttempts.decrementAndGet() <= 0) { - afterLRACalled.set(AfterLraStatus.SENT); - } - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Error when sending after lra", e); - if (remainingAfterLraAttempts.decrementAndGet() <= 0) { - afterLRACalled.set(AfterLraStatus.SENT); - } else { - afterLRACalled.set(AfterLraStatus.NOT_SENT); - } - } finally { - Optional.ofNullable(response).ifPresent(HttpClientResponse::close); - } - if (afterLRACalled.get() == AfterLraStatus.SENT) return true; - } - return false; - } - - - Optional retrieveStatus(Lra lra, ParticipantStatus inProgressStatus) { - URI statusURI = this.getStatusURI().get(); - try (HttpClientResponse response = webClient.get() - .uri(statusURI) - .headers(h -> { - // Dont send parent! - h.add(LRA_HTTP_CONTEXT_HEADER_NAME, lra.lraContextId()); - h.add(LRA_HTTP_RECOVERY_HEADER_NAME, lra.lraContextId() + "/recovery"); - h.add(LRA_HTTP_ENDED_CONTEXT_HEADER_NAME, lra.lraContextId()); - }) - .request()) { - - int code = response.status().code(); - switch (code) { - case 202: - return Optional.of(inProgressStatus); - case 410: //GONE - //Completing -> FailedToComplete ... - return status.get().failedFinalStatus(); - case 503: - case 500: - throw new IllegalStateException(String.format("Client reports unexpected status %s %s, " - + "current participant state is %s, " - + "lra: %s " - + "status uri: %s", - code, - response.as(String.class), - status.get(), - lra.lraId(), - statusURI.toASCIIString())); - default: - ParticipantStatus reportedStatus = valueOf(response.as(String.class)); - Status currentStatus = status.get(); - if (currentStatus.validateNextStatus(reportedStatus)) { - return Optional.of(reportedStatus); - } else { - LOGGER.log(Level.WARNING, - "Client reports unexpected status {0} {1}, " - + "current participant state is {2}, " - + "lra: {3} " - + "status uri: {4}", - new Object[] {code, reportedStatus, currentStatus, lra.lraId(), statusURI.toASCIIString()}); - return Optional.empty(); - } - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Error when getting participant status. " + statusURI, e); - // skip dependent compensation call, another retry with status call might be luckier - throw e; - } - } - - boolean sendForget(Lra lra) { - if (!forgetCalled.compareAndSet(ForgetStatus.NOT_SENT, ForgetStatus.SENDING)) return false; - try ( - HttpClientResponse response = webClient.delete() - .uri(getForgetURI().get()) - .headers(lra.headers()) - .request()) { - - int responseStatus = response.status().code(); - if (responseStatus == 200 || responseStatus == 410) { - forgetCalled.set(ForgetStatus.SENT); - } else { - throw new Exception("Unexpected response from participant " + response.status().code()); - } - } catch (Throwable e) { - LOGGER.log(Level.WARNING, "Unable to send forget of lra {0} to {1}", - new Object[] {lra.lraId(), getForgetURI().get()}); - forgetCalled.set(ForgetStatus.NOT_SENT); - } - return forgetCalled.get() == ForgetStatus.SENT; - } - - boolean equalCompensatorUris(String compensatorUris) { - Set links = Arrays.stream(compensatorUris.split(",")) - .map(Link::valueOf) - .collect(Collectors.toSet()); - - for (Link link : links) { - Optional participantsLink = getCompensatorLink(link.rel()); - if (participantsLink.isEmpty()) { - continue; - } - - if (Objects.equals(participantsLink.get(), link.uri())) { - return true; - } - } - - return false; - } + Optional statusURI(); } diff --git a/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/ParticipantImpl.java b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/ParticipantImpl.java new file mode 100644 index 00000000000..abc405007a5 --- /dev/null +++ b/lra/coordinator/server/src/main/java/io/helidon/lra/coordinator/ParticipantImpl.java @@ -0,0 +1,592 @@ +/* + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.lra.coordinator; + +import java.lang.System.Logger.Level; +import java.net.URI; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.helidon.config.Config; +import io.helidon.webclient.api.HttpClientResponse; +import io.helidon.webclient.api.WebClient; + +import org.eclipse.microprofile.lra.annotation.LRAStatus; +import org.eclipse.microprofile.lra.annotation.ParticipantStatus; + +import static io.helidon.lra.coordinator.LraImpl.LRA_HTTP_CONTEXT_HEADER_NAME; +import static io.helidon.lra.coordinator.LraImpl.LRA_HTTP_ENDED_CONTEXT_HEADER_NAME; +import static io.helidon.lra.coordinator.LraImpl.LRA_HTTP_RECOVERY_HEADER_NAME; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Active; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Compensated; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Compensating; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Completed; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.Completing; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.FailedToCompensate; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.FailedToComplete; +import static org.eclipse.microprofile.lra.annotation.ParticipantStatus.valueOf; + +class ParticipantImpl implements Participant { + + private static final int RETRY_CNT = 60; + private static final int SYNCHRONOUS_RETRY_CNT = 5; + + private static final System.Logger LOGGER = System.getLogger(Participant.class.getName()); + private final AtomicReference compensateCalled = new AtomicReference<>(CompensateStatus.NOT_SENT); + private final AtomicReference forgetCalled = new AtomicReference<>(ForgetStatus.NOT_SENT); + private final AtomicReference afterLRACalled = new AtomicReference<>(AfterLraStatus.NOT_SENT); + private final AtomicReference sendingStatus = new AtomicReference<>(SendingStatus.NOT_SENDING); + private final AtomicInteger remainingCloseAttempts = new AtomicInteger(RETRY_CNT); + private final AtomicInteger remainingAfterLraAttempts = new AtomicInteger(RETRY_CNT); + + private final AtomicReference status = new AtomicReference<>(Status.ACTIVE); + private final Map compensatorLinks = new HashMap<>(); + private final long timeout; + private final WebClient webClient = WebClient.builder().build(); + + ParticipantImpl(Config config) { + timeout = config.get("timeout") + .asLong() + .orElse(500L); + } + + @Override + public Optional completeURI() { + return compensatorLink("complete"); + } + + @Override + public Optional compensateURI() { + return compensatorLink("compensate"); + } + + @Override + public Optional afterURI() { + return compensatorLink("after"); + } + + @Override + public Optional forgetURI() { + return compensatorLink("forget"); + } + + @Override + public Optional statusURI() { + return compensatorLink("status"); + } + + void parseCompensatorLinks(String compensatorLinks) { + Stream.of(compensatorLinks.split(",")) + .filter(s -> !s.isBlank()) + .map(Link::valueOf) + .forEach(link -> this.compensatorLinks.put(link.rel(), link.uri())); + } + + Optional compensatorLink(String rel) { + return Optional.ofNullable(compensatorLinks.get(rel)); + } + + void completeURI(URI completeURI) { + compensatorLinks.put("complete", completeURI); + } + + void compensateURI(URI compensateURI) { + compensatorLinks.put("compensate", compensateURI); + } + + void afterURI(URI afterURI) { + compensatorLinks.put("after", afterURI); + } + + void forgetURI(URI forgetURI) { + compensatorLinks.put("forget", forgetURI); + } + + void statusURI(URI statusURI) { + compensatorLinks.put("status", statusURI); + } + + CompensateStatus compensateStatus() { + return this.compensateCalled.get(); + } + + void compensateStatus(CompensateStatus compensateStatus) { + this.compensateCalled.set(compensateStatus); + } + + void status(Status status) { + this.status.set(status); + } + + ForgetStatus forgetStatus() { + return this.forgetCalled.get(); + } + + void forgetStatus(ForgetStatus forgetStatus) { + this.forgetCalled.set(forgetStatus); + } + + AfterLraStatus afterLraStatus() { + return this.afterLRACalled.get(); + } + + void afterLraStatus(AfterLraStatus afterLraStatus) { + this.afterLRACalled.set(afterLraStatus); + } + + SendingStatus sendingStatus() { + return this.sendingStatus.get(); + } + + void sendingStatus(SendingStatus sendingStatus) { + this.sendingStatus.set(sendingStatus); + } + + int remainingCloseAttempts() { + return this.remainingCloseAttempts.get(); + } + + void remainingCloseAttempts(int remainingCloseAttempts) { + this.remainingCloseAttempts.set(remainingCloseAttempts); + } + + int remainingAfterAttempts() { + return this.remainingAfterLraAttempts.get(); + } + + void remainingAfterAttempts(int remainingAfterAttempts) { + this.remainingAfterLraAttempts.set(remainingAfterAttempts); + } + + Status state() { + return status.get(); + } + + boolean isForgotten() { + return forgetCalled.get() == ForgetStatus.SENT; + } + + boolean isListenerOnly() { + return completeURI().isEmpty() && compensateURI().isEmpty(); + } + + boolean isInEndStateOrListenerOnly() { + return isListenerOnly() || status.get().isFinal(); + } + + boolean sendCancel(LraImpl lra) { + Optional endpointURI = compensateURI(); + for (AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < SYNCHRONOUS_RETRY_CNT;) { + if (!sendingStatus.compareAndSet(SendingStatus.NOT_SENDING, SendingStatus.SENDING)) { + return false; + } + if (!compensateCalled.compareAndSet(CompensateStatus.NOT_SENT, CompensateStatus.SENDING)) { + return false; + } + LOGGER.log(Level.DEBUG, () -> "Sending compensate, sync retry: " + i.get() + + ", status: " + status.get().name() + + " statusUri: " + statusURI().map(URI::toASCIIString).orElse(null)); + HttpClientResponse response = null; + try { + // call for client status only on retries and when status uri is known + if (!status.get().equals(Status.ACTIVE) && statusURI().isPresent()) { + // If the participant does not support idempotency then it MUST be able to report its status + // by annotating one of the methods with the @Status annotation which should report the status + // in case we can't retrieve status from participant just retry n times + ParticipantStatus reportedClientStatus = retrieveStatus(lra, Compensating).orElse(null); + if (reportedClientStatus == Compensated) { + LOGGER.log(Level.INFO, "Participant reports it is compensated."); + status.set(Status.COMPENSATED); + return true; + } else if (reportedClientStatus == FailedToCompensate) { + LOGGER.log(Level.INFO, "Participant reports it failed to compensate."); + status.set(Status.FAILED_TO_COMPENSATE); + return true; + } else if (reportedClientStatus == Active) { + // last call didn't reach participant, try call again + } else if (reportedClientStatus == Completed && lra.isChild()) { + // completed participant can be compensated again in case of nested tx + } else if (reportedClientStatus == Compensating) { + LOGGER.log(Level.INFO, "Participant reports it is still compensating."); + status.set(Status.CLIENT_COMPENSATING); + return false; + } else if (remainingCloseAttempts.decrementAndGet() <= 0) { + LOGGER.log(Level.INFO, "Participant didnt report final status after {0} status call retries.", + new Object[] {RETRY_CNT}); + status.set(Status.FAILED_TO_COMPENSATE); + return true; + } else { + // Unknown status, lets try in next recovery cycle + LOGGER.log(Level.INFO, "Unknown status of " + lra.lraId()); + return false; + } + } + + response = webClient.put() + .uri(endpointURI.get()) + .headers(lra.headers()) + .submit(LRAStatus.Cancelled.name()); + + // When timeout occur we loose track of the participant status + // next retry will attempt to retrieve participant status if status uri is available + + switch (response.status().code()) { + // complete or compensated + case 200: + case 410: + LOGGER.log(Level.INFO, "Compensated participant of LRA {0} {1}", + new Object[] {lra.lraId(), this.compensateURI()}); + status.set(Status.COMPENSATED); + compensateCalled.set(CompensateStatus.SENT); + return true; + + // retryable + case 202: + // Still compensating, check with @Status later + this.status.set(Status.CLIENT_COMPENSATING); + return false; + case 409: + case 404: + case 503: + default: + throw new Exception(response.status().code() + " " + response.status().reasonPhrase()); + } + + } catch (Exception e) { + LOGGER.log(Level.WARNING, + () -> "Can't reach participant's compensate endpoint: " + + endpointURI.map(URI::toASCIIString).orElse("unknown"), e); + if (remainingCloseAttempts.decrementAndGet() <= 0) { + LOGGER.log(Level.WARNING, "Failed to compensate participant of LRA {0} {1} {2}", + new Object[] {lra.lraId(), this.compensateURI(), e.getMessage()}); + status.set(Status.FAILED_TO_COMPENSATE); + } else { + status.set(Status.COMPENSATING); + } + + } finally { + Optional.ofNullable(response).ifPresent(HttpClientResponse::close); + sendingStatus.set(SendingStatus.NOT_SENDING); + compensateCalled.compareAndSet(CompensateStatus.SENDING, CompensateStatus.NOT_SENT); + } + } + return false; + } + + boolean sendComplete(LraImpl lra) { + Optional endpointURI = completeURI(); + for (AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < SYNCHRONOUS_RETRY_CNT;) { + if (!sendingStatus.compareAndSet(SendingStatus.NOT_SENDING, SendingStatus.SENDING)) { + return false; + } + LOGGER.log(Level.DEBUG, () -> "Sending complete, sync retry: " + i.get() + + ", status: " + status.get().name() + + " statusUri: " + statusURI().map(URI::toASCIIString).orElse(null)); + HttpClientResponse response = null; + try { + if (status.get().isFinal()) { + return true; + // call for client status only on retries and when status uri is known + } else if (!status.get().equals(Status.ACTIVE) && statusURI().isPresent()) { + // If the participant does not support idempotency then it MUST be able to report its status + // by annotating one of the methods with the @Status annotation which should report the status + // in case we can't retrieve status from participant just retry n times + ParticipantStatus reportedClientStatus = retrieveStatus(lra, Completing).orElse(null); + if (reportedClientStatus == Completed) { + LOGGER.log(Level.INFO, "Participant reports it is completed."); + status.set(Status.COMPLETED); + return true; + } else if (reportedClientStatus == FailedToComplete) { + LOGGER.log(Level.INFO, "Participant reports it failed to complete."); + status.set(Status.FAILED_TO_COMPLETE); + return true; + } else if (reportedClientStatus == Active) { + // last call didn't reach participant, try call again + } else if (reportedClientStatus == Completing) { + LOGGER.log(Level.INFO, "Participant reports it is still completing."); + status.set(Status.CLIENT_COMPLETING); + return false; + } else if (remainingCloseAttempts.decrementAndGet() <= 0) { + LOGGER.log(Level.INFO, "Participant didnt report final status after {0} status call retries.", + new Object[] {RETRY_CNT}); + status.set(Status.FAILED_TO_COMPLETE); + return true; + } else { + // Unknown status, lets try in next recovery cycle + return false; + } + } + response = webClient.put() + .uri(endpointURI.get()) + .headers(lra.headers()) + .submit(LRAStatus.Closed.name()); + // When timeout occur we loose track of the participant status + // next retry will attempt to retrieve participant status if status uri is available + + switch (response.status().code()) { + // complete or compensated + case 200: + case 410: + status.set(Status.COMPLETED); + return true; + + // retryable + case 202: + // Still completing, check with @Status later + this.status.set(Status.CLIENT_COMPLETING); + return false; + case 409: + case 404: + case 503: + default: + throw new Exception(response.status().code() + " " + response.status().reasonPhrase()); + } + + } catch (Exception e) { + LOGGER.log(Level.WARNING, + () -> "Can't reach participant's complete endpoint: " + endpointURI.map(URI::toASCIIString) + .orElse("unknown"), e); + if (remainingCloseAttempts.decrementAndGet() <= 0) { + LOGGER.log(Level.WARNING, "Failed to complete participant of LRA {0} {1} {2}", + new Object[] {lra.lraId(), + this.completeURI(), e.getMessage()}); + status.set(Status.FAILED_TO_COMPLETE); + } else { + status.set(Status.COMPLETING); + } + } finally { + Optional.ofNullable(response).ifPresent(HttpClientResponse::close); + sendingStatus.set(SendingStatus.NOT_SENDING); + } + } + return false; + } + + boolean trySendAfterLRA(LraImpl lra) { + for (int i = 0; i < SYNCHRONOUS_RETRY_CNT; i++) { + // Participant in right state + if (!isInEndStateOrListenerOnly()) { + return false; + } + // LRA in right state + if (!(Set.of(LRAStatus.Closed, LRAStatus.Cancelled).contains(lra.lraStatus().get()))) { + return false; + } + + HttpClientResponse response = null; + try { + Optional afterURI = afterURI(); + if (afterURI.isPresent() && afterLRACalled.compareAndSet(AfterLraStatus.NOT_SENT, AfterLraStatus.SENDING)) { + response = webClient.put() + .uri(afterURI.get()) + .headers(lra.headers()) + .submit(lra.lraStatus().get().name()); + + if (response.status().code() == 200) { + afterLRACalled.set(AfterLraStatus.SENT); + } else if (remainingAfterLraAttempts.decrementAndGet() <= 0) { + afterLRACalled.set(AfterLraStatus.SENT); + } + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Error when sending after lra", e); + if (remainingAfterLraAttempts.decrementAndGet() <= 0) { + afterLRACalled.set(AfterLraStatus.SENT); + } else { + afterLRACalled.set(AfterLraStatus.NOT_SENT); + } + } finally { + Optional.ofNullable(response).ifPresent(HttpClientResponse::close); + } + if (afterLRACalled.get() == AfterLraStatus.SENT) { + return true; + } + } + return false; + } + + Optional retrieveStatus(LraImpl lra, ParticipantStatus inProgressStatus) { + URI statusURI = this.statusURI().get(); + try (HttpClientResponse response = webClient.get() + .uri(statusURI) + .headers(h -> { + // Dont send parent! + h.add(LRA_HTTP_CONTEXT_HEADER_NAME, lra.lraContextId()); + h.add(LRA_HTTP_RECOVERY_HEADER_NAME, lra.lraContextId() + "/recovery"); + h.add(LRA_HTTP_ENDED_CONTEXT_HEADER_NAME, lra.lraContextId()); + }) + .request()) { + + int code = response.status().code(); + switch (code) { + case 202: + return Optional.of(inProgressStatus); + case 410: //GONE + //Completing -> FailedToComplete ... + return status.get().failedFinalStatus(); + case 503: + case 500: + throw new IllegalStateException(String.format("Client reports unexpected status %s %s, " + + "current participant state is %s, " + + "lra: %s " + + "status uri: %s", + code, + response.as(String.class), + status.get(), + lra.lraId(), + statusURI.toASCIIString())); + default: + ParticipantStatus reportedStatus = valueOf(response.as(String.class)); + Status currentStatus = status.get(); + if (currentStatus.validateNextStatus(reportedStatus)) { + return Optional.of(reportedStatus); + } else { + LOGGER.log(Level.WARNING, + "Client reports unexpected status {0} {1}, " + + "current participant state is {2}, " + + "lra: {3} " + + "status uri: {4}", + new Object[] {code, reportedStatus, currentStatus, lra.lraId(), statusURI.toASCIIString()}); + return Optional.empty(); + } + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Error when getting participant status. " + statusURI, e); + // skip dependent compensation call, another retry with status call might be luckier + throw e; + } + } + + boolean sendForget(LraImpl lra) { + if (!forgetCalled.compareAndSet(ForgetStatus.NOT_SENT, ForgetStatus.SENDING)) { + return false; + } + try ( + HttpClientResponse response = webClient.delete() + .uri(forgetURI().get()) + .headers(lra.headers()) + .request()) { + + int responseStatus = response.status().code(); + if (responseStatus == 200 || responseStatus == 410) { + forgetCalled.set(ForgetStatus.SENT); + } else { + throw new Exception("Unexpected response from participant " + response.status().code()); + } + } catch (Throwable e) { + LOGGER.log(Level.WARNING, "Unable to send forget of lra {0} to {1}", + new Object[] {lra.lraId(), forgetURI().get()}); + forgetCalled.set(ForgetStatus.NOT_SENT); + } + return forgetCalled.get() == ForgetStatus.SENT; + } + + boolean equalCompensatorUris(String compensatorUris) { + Set links = Arrays.stream(compensatorUris.split(",")) + .map(Link::valueOf) + .collect(Collectors.toSet()); + + for (Link link : links) { + Optional participantsLink = compensatorLink(link.rel()); + if (participantsLink.isEmpty()) { + continue; + } + + if (Objects.equals(participantsLink.get(), link.uri())) { + return true; + } + } + + return false; + } + + enum Status { + ACTIVE(Active, null, null, false, Set.of(Completing, Compensating)), + + COMPENSATED(Compensated, null, null, true, Set.of(Compensated)), + COMPLETED(Completed, null, null, true, Set.of(Completed)), + FAILED_TO_COMPENSATE(FailedToCompensate, null, null, true, Set.of()), + FAILED_TO_COMPLETE(FailedToComplete, null, null, true, Set.of()), + + CLIENT_COMPENSATING(Compensating, COMPENSATED, FAILED_TO_COMPENSATE, false, + Set.of(Active, Compensated, FailedToCompensate)), + CLIENT_COMPLETING(Completing, COMPLETED, FAILED_TO_COMPLETE, false, Set.of(Active, Completed, FailedToComplete)), + COMPENSATING(Compensating, COMPENSATED, FAILED_TO_COMPENSATE, false, Set.of(Active, Compensated, FailedToCompensate)), + COMPLETING(Completing, COMPLETED, FAILED_TO_COMPLETE, false, Set.of(Active, Completed, FailedToComplete)); + + private final ParticipantStatus participantStatus; + private final Status successFinalStatus; + private final Status failedFinalStatus; + private final boolean finalState; + private final Set validNextStates; + + Status(ParticipantStatus participantStatus, + Status successFinalStatus, + Status failedFinalStatus, + boolean finalState, + Set validNextStates) { + this.participantStatus = participantStatus; + this.successFinalStatus = successFinalStatus; + this.failedFinalStatus = failedFinalStatus; + this.finalState = finalState; + this.validNextStates = validNextStates; + } + + ParticipantStatus participantStatus() { + return participantStatus; + } + + boolean isFinal() { + return finalState; + } + + boolean validateNextStatus(ParticipantStatus participantStatus) { + return validNextStates.contains(participantStatus); + } + + Optional successFinalStatus() { + return Optional.ofNullable(successFinalStatus.participantStatus()); + } + + Optional failedFinalStatus() { + return Optional.ofNullable(failedFinalStatus.participantStatus); + } + } + + enum SendingStatus { + SENDING, NOT_SENDING; + } + + enum AfterLraStatus { + NOT_SENT, SENDING, SENT; + } + + enum ForgetStatus { + NOT_SENT, SENDING, SENT; + } + + enum CompensateStatus { + NOT_SENT, SENDING, SENT; + } +} diff --git a/lra/coordinator/server/src/main/java/module-info.java b/lra/coordinator/server/src/main/java/module-info.java index 41fb4c00841..e97b6dafb7c 100644 --- a/lra/coordinator/server/src/main/java/module-info.java +++ b/lra/coordinator/server/src/main/java/module-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ * Helidon LRA coordinator. */ module io.helidon.lra.coordinator { + exports io.helidon.lra.coordinator; requires io.helidon.dbclient.jdbc; requires io.helidon.dbclient; diff --git a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraCdiExtension.java b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraCdiExtension.java index 26f4e2d8ef8..6fcea662735 100644 --- a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraCdiExtension.java +++ b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraCdiExtension.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,8 @@ import java.util.stream.Stream; import io.helidon.common.Reflected; +import io.helidon.config.Config; +import io.helidon.config.mp.MpConfig; import io.helidon.microprofile.server.ServerCdiExtension; import io.helidon.webserver.http.HttpService; @@ -60,6 +62,7 @@ import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; import jakarta.ws.rs.core.Application; +import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.lra.annotation.AfterLRA; import org.eclipse.microprofile.lra.annotation.Compensate; import org.eclipse.microprofile.lra.annotation.Complete; @@ -84,6 +87,7 @@ public class LraCdiExtension implements Extension { private static final System.Logger LOGGER = System.getLogger(LraCdiExtension.class.getName()); + private static final String CONFIG_PREFIX = "helidon.lra.participant"; private static final Set> EXPECTED_ANNOTATIONS = Set.of( AfterLRA.class, @@ -98,6 +102,7 @@ public class LraCdiExtension implements Extension { private final Map, Bean> lraCdiBeanReferences = new HashMap<>(); private final Indexer indexer; private final ClassLoader classLoader; + private final Config config; private IndexView index; @@ -105,6 +110,7 @@ public class LraCdiExtension implements Extension { * Initialize MicroProfile Long Running Actions CDI extension. */ public LraCdiExtension() { + config = MpConfig.toHelidonConfig(ConfigProvider.getConfig()).get(CONFIG_PREFIX); indexer = new Indexer(); classLoader = Thread.currentThread().getContextClassLoader(); // Needs to be always indexed @@ -117,17 +123,9 @@ public LraCdiExtension() { Application.class, NonJaxRsResource.class).forEach(c -> runtimeIndex(DotName.createSimple(c.getName()))); - List indexFiles; - try { - indexFiles = findIndexFiles("META-INF/jandex.idx"); - if (!indexFiles.isEmpty()) { - index = CompositeIndex.create(indexer.complete(), existingIndexFileReader(indexFiles)); - } else { - index = null; - } - } catch (IOException e) { - LOGGER.log(Level.WARNING, "Error when locating Jandex index, fall-back to runtime computed index.", e); - index = null; + Boolean useBuildTimeIndex = config.get("use-build-time-index").asBoolean().orElse(Boolean.TRUE); + if (useBuildTimeIndex) { + resolveBuildTimeIndex(); } } @@ -139,7 +137,7 @@ private void index( LRA.class, AfterLRA.class, Compensate.class, Complete.class, Forget.class, Status.class }) ProcessAnnotatedType pat) { - // compile time bilt index + // compile time built index if (index != null) return; // create runtime index when pre-built index is not available runtimeIndex(DotName.createSimple(pat.getAnnotatedType().getJavaClass().getName())); @@ -278,6 +276,21 @@ void runtimeIndex(DotName fqdn) { } } + private void resolveBuildTimeIndex() { + List indexFiles; + try { + indexFiles = findIndexFiles(config.get("index-resource").asString().orElse("META-INF/jandex.idx")); + if (!indexFiles.isEmpty()) { + index = CompositeIndex.create(indexer.complete(), existingIndexFileReader(indexFiles)); + } else { + index = null; + } + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Error when locating Jandex index, fall-back to runtime computed index.", e); + index = null; + } + } + private IndexView existingIndexFileReader(List indexUrls) throws IOException { List indices = new ArrayList<>(); for (URL indexURL : indexUrls) { diff --git a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/ParticipantImpl.java b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/ParticipantImpl.java index 923e1a9b418..cc2b0355093 100644 --- a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/ParticipantImpl.java +++ b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/ParticipantImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import io.helidon.lra.coordinator.client.Participant; @@ -150,9 +149,9 @@ public Optional status() { static Optional getLRAAnnotation(Method m) { List found = Arrays.stream(m.getDeclaredAnnotations()) .filter(a -> LRA_ANNOTATIONS.contains(a.annotationType())) - .collect(Collectors.toList()); + .toList(); - if (found.size() == 0) { + if (found.isEmpty()) { // LRA can be inherited from class or its predecessors var clazz = m.getDeclaringClass(); do { diff --git a/microprofile/lra/jax-rs/src/main/java/module-info.java b/microprofile/lra/jax-rs/src/main/java/module-info.java index 1d778858091..df670692c40 100644 --- a/microprofile/lra/jax-rs/src/main/java/module-info.java +++ b/microprofile/lra/jax-rs/src/main/java/module-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2023 Oracle and/or its affiliates. + * Copyright (c) 2021, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ ) @SuppressWarnings({ "requires-automatic", "requires-transitive-automatic" }) module io.helidon.microprofile.lra { + exports io.helidon.microprofile.lra; requires io.helidon.common.reactive; requires io.helidon.config; @@ -46,6 +47,7 @@ requires jakarta.cdi; requires transitive jersey.common; + requires io.helidon.config.mp; uses io.helidon.lra.coordinator.client.CoordinatorClient; diff --git a/microprofile/lra/pom.xml b/microprofile/lra/pom.xml index cb7dcf3792e..8ff245a2f44 100644 --- a/microprofile/lra/pom.xml +++ b/microprofile/lra/pom.xml @@ -1,7 +1,7 @@ + + + 4.0.0 + + io.helidon.microprofile.lra + helidon-microprofile-lra-project + 4.2.0-SNAPSHOT + ../pom.xml + + + helidon-microprofile-lra-testing + Helidon Microprofile LRA Testing + + + LRA test support + + + + + io.helidon.microprofile.server + helidon-microprofile-server + + + io.helidon.microprofile.lra + helidon-microprofile-lra + + + io.helidon.lra + helidon-lra-coordinator-narayana-client + + + io.helidon.lra + helidon-lra-coordinator-server + + + org.junit.jupiter + junit-jupiter-api + test + + + org.hamcrest + hamcrest-all + test + + + io.helidon.logging + helidon-logging-jul + test + + + io.helidon.microprofile.testing + helidon-microprofile-testing-junit5 + test + + + + diff --git a/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/TestLraCoordinator.java b/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/TestLraCoordinator.java new file mode 100644 index 00000000000..e3c6bf3a4da --- /dev/null +++ b/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/TestLraCoordinator.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.testing.lra; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import io.helidon.config.Config; +import io.helidon.lra.coordinator.CoordinatorService; +import io.helidon.lra.coordinator.Lra; +import io.helidon.microprofile.lra.CoordinatorLocatorService; +import io.helidon.microprofile.server.RoutingName; +import io.helidon.microprofile.server.RoutingPath; +import io.helidon.microprofile.server.ServerCdiExtension; +import io.helidon.webserver.http.HttpService; + +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.Produces; +import jakarta.enterprise.inject.spi.BeanManager; +import jakarta.inject.Inject; + +import static jakarta.interceptor.Interceptor.Priority.PLATFORM_AFTER; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Enables LRA coordinator on another socket of this server with random port. + */ +@ApplicationScoped +public class TestLraCoordinator { + + static final String ROUTING_NAME = "test-lra-coordinator"; + static final String CONTEXT_PATH = "/lra-coordinator"; + private final CompletableFuture port = new CompletableFuture<>(); + private final ServerCdiExtension serverCdiExtension; + private final CoordinatorService coordinatorService; + + @Inject + TestLraCoordinator(Config config, + ServerCdiExtension serverCdiExtension, + CoordinatorLocatorService coordinatorLocator) { + this.serverCdiExtension = serverCdiExtension; + this.coordinatorService = CoordinatorService.builder() + .url(this::coordinatorUri) + .config(config.get(CoordinatorService.CONFIG_PREFIX)) + .build(); + coordinatorLocator.overrideCoordinatorUriSupplier(this::coordinatorUri); + } + + @Produces + @ApplicationScoped + @RoutingName(value = ROUTING_NAME, required = true) + @RoutingPath(CONTEXT_PATH) + HttpService coordinatorService() { + return coordinatorService; + } + + /** + * Return test LRA coordinator URL. + * + * @return coordinator url + */ + public URI coordinatorUri() { + return URI.create("http://localhost:" + awaitPort() + CONTEXT_PATH); + } + + /** + * Get LRA by LraId. + * + * @param lraId with or without coordinator url prefix. + * @return lra when registered by text coordinator + */ + public Lra lra(String lraId) { + if (lraId == null) { + return null; + } + if (lraId.startsWith(coordinatorUri() + "/")) { + return coordinatorService.lra(lraId.substring(coordinatorUri().toString().length() + 1)); + } + return coordinatorService.lra(lraId); + } + + private void ready(@Observes @Priority(PLATFORM_AFTER + 101) @Initialized(ApplicationScoped.class) Object e, BeanManager b) { + port.complete(serverCdiExtension.port(ROUTING_NAME)); + } + + private int awaitPort() { + try { + return port.get(20, SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } +} diff --git a/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/TestLraCoordinatorConfigSource.java b/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/TestLraCoordinatorConfigSource.java new file mode 100644 index 00000000000..ad1a1e89f7e --- /dev/null +++ b/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/TestLraCoordinatorConfigSource.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.testing.lra; + +import java.util.Map; +import java.util.Set; + +import org.eclipse.microprofile.config.spi.ConfigSource; + +/** + * Configuration for {@code @HelidonTest} with LRA coordinator running on random port. + * Any of the properties can be overridden with {@code @AddConfig}. + * Example of running test coordinator on port 8070: + *
{@code
+ * @HelidonTest
+ * @AddConfig(key = "server.sockets.500.port", value = "8070")
+ * @AddBean(TestLraCoordinator.class)
+ * }
+ */ +public class TestLraCoordinatorConfigSource implements ConfigSource { + + private static final String PORT_IDX = System.getProperty("helidon.lra.coordinator.test-socket.index", "500"); + private static final Map CONFIG = Map.of( + // Extra socket for coordinator on random port + "server.sockets." + PORT_IDX + ".name", TestLraCoordinator.ROUTING_NAME, + "server.sockets." + PORT_IDX + ".port", "0", + "server.sockets." + PORT_IDX + ".bind-address", "localhost", + // Avoid using persistent tx log in test LRA coordinator + "helidon.lra.coordinator.persistence", "false", + // Avoid using build time Jandex index + "helidon.lra.participant.use-build-time-index", "false"); + + /** + * Initialized by service locator. + */ + public TestLraCoordinatorConfigSource() { + } + + @Override + public Set getPropertyNames() { + return CONFIG.keySet(); + } + + @Override + public int getOrdinal() { + return 5000; + } + + @Override + public String getValue(String propertyName) { + return CONFIG.get(propertyName); + } + + @Override + public String getName() { + return TestLraCoordinator.ROUTING_NAME; + } +} diff --git a/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/package-info.java b/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/package-info.java new file mode 100644 index 00000000000..a1737087e36 --- /dev/null +++ b/microprofile/lra/testing/src/main/java/io/helidon/microprofile/testing/lra/package-info.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Test LRA coordinator. + * Allows simplified testing of LRA enabled JAX-RS resources. + * + *
{@code
+ * @HelidonTest
+ * @AddBean(TestLraCoordinator.class)
+ * @Path("/test")
+ * public class LraTest {
+ *
+ *     private final WebTarget target;
+ *     private final Set completedLras;
+ *     private final Set cancelledLras;
+ *     private final TestLraCoordinator coordinator;
+ *
+ *     @Inject
+ *     public LraTest(WebTarget target,
+ *                    TestLraCoordinator coordinator) {
+ *         this.target = target;
+ *         this.coordinator = coordinator;
+ *         this.completedLras = new CopyOnWriteArraySet<>();
+ *         this.cancelledLras = new CopyOnWriteArraySet<>();
+ *     }
+ *
+ *     @PUT
+ *     @Path("/withdraw")
+ *     @LRA(LRA.Type.REQUIRES_NEW)
+ *     public Response withdraw(@HeaderParam(LRA.LRA_HTTP_CONTEXT_HEADER) Optional lraId, String content) {
+ *         ...
+ *         return Response.ok().build();
+ *     }
+ *
+ *     @Complete
+ *     public void complete(URI lraId) {
+ *         completedLras.add(lraId.toString());
+ *     }
+ *
+ *     @Compensate
+ *     public void rollback(URI lraId) {
+ *         cancelledLras.add(lraId.toString());
+ *     }
+ *
+ *     @Test
+ *     public void testLra() {
+ *         try (Response res = target
+ *                 .path("/test/withdraw")
+ *                 .request()
+ *                 .put(Entity.entity("test", MediaType.TEXT_PLAIN_TYPE))) {
+ *             assertThat(res.getStatus(), is(200));
+ *             String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER);
+ *             Lra lra = coordinator.lra(lraId);
+ *             assertThat(lra.status(), is(LRAStatus.Closed));
+ *             assertThat(completedLras, contains(lraId));
+ *         }
+ *     }
+ * }
+ * }
+ */ +package io.helidon.microprofile.testing.lra; diff --git a/microprofile/lra/testing/src/main/java/module-info.java b/microprofile/lra/testing/src/main/java/module-info.java new file mode 100644 index 00000000000..d6c8c402868 --- /dev/null +++ b/microprofile/lra/testing/src/main/java/module-info.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.eclipse.microprofile.config.spi.ConfigSource; + +/** + * Test LRA coordinator. + */ +module io.helidon.microprofile.testing.lra { + + requires io.helidon.config.mp; + requires io.helidon.config.yaml.mp; + requires io.helidon.microprofile.cdi; + requires jakarta.inject; + + requires transitive jakarta.cdi; + requires transitive jakarta.ws.rs; + + requires static io.helidon.microprofile.server; + requires io.helidon.lra.coordinator; + requires io.helidon.microprofile.lra; + + exports io.helidon.microprofile.testing.lra; + + provides org.eclipse.microprofile.config.spi.ConfigSource + with io.helidon.microprofile.testing.lra.TestLraCoordinatorConfigSource; + +} diff --git a/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraDisabledDiscoveryResourceTest.java b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraDisabledDiscoveryResourceTest.java new file mode 100644 index 00000000000..77607fc7005 --- /dev/null +++ b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraDisabledDiscoveryResourceTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.testing.lra; + +import java.net.URI; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import io.helidon.lra.coordinator.Lra; +import io.helidon.microprofile.config.ConfigCdiExtension; +import io.helidon.microprofile.lra.LraCdiExtension; +import io.helidon.microprofile.testing.junit5.AddBean; +import io.helidon.microprofile.testing.junit5.AddExtension; +import io.helidon.microprofile.testing.junit5.AddJaxRs; +import io.helidon.microprofile.testing.junit5.DisableDiscovery; +import io.helidon.microprofile.testing.junit5.HelidonTest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.HeaderParam; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import org.eclipse.microprofile.lra.annotation.Compensate; +import org.eclipse.microprofile.lra.annotation.Complete; +import org.eclipse.microprofile.lra.annotation.LRAStatus; +import org.eclipse.microprofile.lra.annotation.ws.rs.LRA; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; + +@HelidonTest +@DisableDiscovery +@AddJaxRs +@AddBean(TestLraCoordinator.class) +@AddExtension(LraCdiExtension.class) +@AddExtension(ConfigCdiExtension.class) +@Path("/test/internal") +public class LraDisabledDiscoveryResourceTest { + + private final WebTarget target; + private final Set completedLras; + private final Set cancelledLras; + private final TestLraCoordinator coordinator; + + @Inject + public LraDisabledDiscoveryResourceTest(WebTarget target, + TestLraCoordinator coordinator) { + this.target = target; + this.coordinator = coordinator; + this.completedLras = new CopyOnWriteArraySet<>(); + this.cancelledLras = new CopyOnWriteArraySet<>(); + } + + @PUT + @Path("/withdraw") + @LRA(LRA.Type.REQUIRES_NEW) + public Response withdraw(@HeaderParam(LRA.LRA_HTTP_CONTEXT_HEADER) Optional lraId, String content) { + if ("BOOM".equals(content)) { + throw new IllegalArgumentException("BOOM"); + } + return Response.ok().build(); + } + + @Complete + public void complete(URI lraId) { + completedLras.add(lraId.toString()); + } + + @Compensate + public void rollback(URI lraId) { + cancelledLras.add(lraId.toString()); + } + + @Test + public void testLraComplete() { + try (Response res = target + .path("/test/internal/withdraw") + .request() + .put(Entity.entity("test", MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(200)); + String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER); + Lra lra = coordinator.lra(lraId); + assertThat(lra.status(), is(LRAStatus.Closed)); + assertThat(completedLras, contains(lraId)); + } + } + + @Test + public void testLraCompensate() { + try (Response res = target + .path("/test/internal/withdraw") + .request() + .put(Entity.entity("BOOM", MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(500)); + String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER); + Lra lra = coordinator.lra(lraId); + assertThat(lra.status(), is(LRAStatus.Cancelled)); + assertThat(cancelledLras, contains(lraId)); + } + } +} diff --git a/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraExternalResourceTest.java b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraExternalResourceTest.java new file mode 100644 index 00000000000..b2cf90842d6 --- /dev/null +++ b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraExternalResourceTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.testing.lra; + +import io.helidon.lra.coordinator.Lra; +import io.helidon.microprofile.testing.junit5.AddBean; +import io.helidon.microprofile.testing.junit5.HelidonTest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import org.eclipse.microprofile.lra.annotation.LRAStatus; +import org.eclipse.microprofile.lra.annotation.ws.rs.LRA; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; + +@HelidonTest +@AddBean(WithdrawTestResource.class) +@AddBean(TestLraCoordinator.class) +public class LraExternalResourceTest { + + private final WithdrawTestResource withdrawTestResource; + private final TestLraCoordinator coordinator; + private final WebTarget target; + + @Inject + public LraExternalResourceTest(WithdrawTestResource withdrawTestResource, TestLraCoordinator coordinator, WebTarget target) { + this.withdrawTestResource = withdrawTestResource; + this.coordinator = coordinator; + this.target = target; + } + + @Test + public void testLraComplete() { + try (Response res = target + .path("/test/external/withdraw") + .request() + .put(Entity.entity("test", MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(200)); + String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER); + Lra lra = coordinator.lra(lraId); + assertThat(lra.status(), is(LRAStatus.Closed)); + assertThat(withdrawTestResource.getCompletedLras(), contains(lraId)); + } + } + + @Test + public void testLraCompensate() { + try (Response res = target + .path("/test/external/withdraw") + .request() + .put(Entity.entity("BOOM", MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(500)); + String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER); + Lra lra = coordinator.lra(lraId); + assertThat(lra.status(), is(LRAStatus.Cancelled)); + assertThat(withdrawTestResource.getCancelledLras(), contains(lraId)); + } + } +} diff --git a/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraMultiPortTest.java b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraMultiPortTest.java new file mode 100644 index 00000000000..a0487365e1a --- /dev/null +++ b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/LraMultiPortTest.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.testing.lra; + +import java.net.URI; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import io.helidon.lra.coordinator.Lra; +import io.helidon.microprofile.server.RoutingName; +import io.helidon.microprofile.server.RoutingPath; +import io.helidon.microprofile.testing.junit5.AddBean; +import io.helidon.microprofile.testing.junit5.AddConfig; +import io.helidon.microprofile.testing.junit5.HelidonTest; +import io.helidon.microprofile.testing.junit5.Socket; +import io.helidon.webserver.http.HttpService; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import jakarta.ws.rs.HeaderParam; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import org.eclipse.microprofile.lra.annotation.Compensate; +import org.eclipse.microprofile.lra.annotation.Complete; +import org.eclipse.microprofile.lra.annotation.LRAStatus; +import org.eclipse.microprofile.lra.annotation.ws.rs.LRA; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; + +@HelidonTest +@AddConfig(key = "server.sockets.0.name", value = "test-route") +@AddConfig(key = "server.sockets.0.port", value = "0") +@AddConfig(key = "server.sockets.0.bind-address", value = "localhost") +@AddBean(TestLraCoordinator.class) +@Path("/test/multi-port") +public class LraMultiPortTest { + + private final WebTarget target; + private final WebTarget otherTarget; + private final Set completedLras; + private final Set cancelledLras; + private final TestLraCoordinator coordinator; + + @Inject + public LraMultiPortTest(WebTarget target, + TestLraCoordinator coordinator, + @Socket("test-route") WebTarget otherTarget) { + this.target = target; + this.coordinator = coordinator; + this.otherTarget = otherTarget; + this.completedLras = new CopyOnWriteArraySet<>(); + this.cancelledLras = new CopyOnWriteArraySet<>(); + } + + @Produces + @ApplicationScoped + @RoutingName("test-route") + @RoutingPath("/test/route") + public HttpService anotherRoute() { + return r -> r.any((req, res) -> res.send("Hello from test route!")); + } + + @PUT + @Path("/outer") + @LRA(LRA.Type.REQUIRES_NEW) + public Response withdraw(@HeaderParam(LRA.LRA_HTTP_CONTEXT_HEADER) Optional lraId, String content) { + try (Response res = target.path("/test/multi-port/inner") + .request() + .put(Entity.entity(content, MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(Response.Status.OK.getStatusCode())); + } + return Response.ok().build(); + } + + @PUT + @Path("/inner") + @LRA(LRA.Type.REQUIRED) + public Response deposit(@HeaderParam(LRA.LRA_HTTP_CONTEXT_HEADER) Optional lraId, String content) { + if ("BOOM".equals(content)) { + throw new IllegalArgumentException("BOOM"); + } + return Response.ok().build(); + } + + @Complete + public void complete(URI lraId) { + completedLras.add(lraId.toString()); + } + + @Compensate + public void rollback(URI lraId) { + cancelledLras.add(lraId.toString()); + } + + @Test + public void testLra() { + try (Response res = target + .path("/test/multi-port/outer") + .request() + .put(Entity.entity("test", MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(200)); + String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER); + Lra lra = coordinator.lra(lraId); + assertThat(lra.status(), is(LRAStatus.Closed)); + assertThat(completedLras, contains(lraId)); + } + } + + @Test + public void testCompensatedLra() { + try (Response res = target + .path("/test/multi-port/outer") + .request() + .put(Entity.entity("BOOM", MediaType.TEXT_PLAIN_TYPE))) { + assertThat(res.getStatus(), is(500)); + String lraId = res.getHeaderString(LRA.LRA_HTTP_CONTEXT_HEADER); + Lra lra = coordinator.lra(lraId); + assertThat(lra.status(), is(LRAStatus.Cancelled)); + assertThat(cancelledLras, contains(lraId)); + } + } + + @Test + public void testOtherRoute() { + try (Response res = otherTarget + .path("/test/route") + .request() + .get()) { + assertThat(res.getStatus(), is(200)); + assertThat(res.readEntity(String.class), is("Hello from test route!")); + } + } + +} diff --git a/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/NonLraResourceTest.java b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/NonLraResourceTest.java new file mode 100644 index 00000000000..774541b117c --- /dev/null +++ b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/NonLraResourceTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.testing.lra; + +import io.helidon.microprofile.testing.junit5.HelidonTest; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.Response; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@HelidonTest +@Path("/test/non-lra") +public class NonLraResourceTest { + + private final WebTarget target; + + @Inject + public NonLraResourceTest(WebTarget target) { + this.target = target; + } + + @GET + @Path("/say-hi") + public String sayHi() { + return "Hi!"; + } + + @Test + public void testNonLraResource() { + try (Response res = target + .path("/test/non-lra/say-hi") + .request() + .get()) { + assertThat(res.getStatus(), is(200)); + assertThat(res.readEntity(String.class), is("Hi!")); + } + } +} diff --git a/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/WithdrawTestResource.java b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/WithdrawTestResource.java new file mode 100644 index 00000000000..9487bc54b50 --- /dev/null +++ b/microprofile/lra/testing/src/test/java/io/helidon/microprofile/testing/lra/WithdrawTestResource.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.testing.lra; + +import java.net.URI; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.HeaderParam; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; +import org.eclipse.microprofile.lra.annotation.Compensate; +import org.eclipse.microprofile.lra.annotation.Complete; +import org.eclipse.microprofile.lra.annotation.ws.rs.LRA; + +@ApplicationScoped +@Path("/test/external") +public class WithdrawTestResource { + + private final Set completedLras; + private final Set cancelledLras; + + public WithdrawTestResource() { + this.completedLras = new CopyOnWriteArraySet<>(); + this.cancelledLras = new CopyOnWriteArraySet<>(); + } + + @PUT + @Path("/withdraw") + @LRA(LRA.Type.REQUIRES_NEW) + public Response withdraw(@HeaderParam(LRA.LRA_HTTP_CONTEXT_HEADER) Optional lraId, String content) { + if ("BOOM".equals(content)) { + throw new IllegalArgumentException("BOOM"); + } + return Response.ok().build(); + } + + @Complete + public void complete(URI lraId) { + completedLras.add(lraId.toString()); + } + + @Compensate + public void rollback(URI lraId) { + cancelledLras.add(lraId.toString()); + } + + Set getCompletedLras() { + return completedLras; + } + + Set getCancelledLras() { + return cancelledLras; + } +} diff --git a/microprofile/lra/testing/src/test/resources/logging-test.properties b/microprofile/lra/testing/src/test/resources/logging-test.properties new file mode 100644 index 00000000000..a61f39ea0cb --- /dev/null +++ b/microprofile/lra/testing/src/test/resources/logging-test.properties @@ -0,0 +1,25 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +handlers=io.helidon.logging.jul.HelidonConsoleHandler + +# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n + +# Global logging level. Can be overridden by specific loggers +.level=WARNING + +io.helidon.level=INFO