From f2a06c65df2cb1c44ebf774b56f4c329d4363685 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 28 Aug 2025 09:44:40 +0200 Subject: [PATCH 1/5] [Fix #744] Suspend/resume status and associated event Workflow suspend and resume event should be sent regardless a task was in fact suspended or not. Same applies to workflow status. Signed-off-by: Francisco Javier Tirado Sarti --- .../impl/WorkflowMutableInstance.java | 12 ++++++------ .../impl/test/LifeCycleEventsTest.java | 13 +++++++++++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index 4c3abd18..796d2e68 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -176,6 +176,10 @@ public boolean suspend() { statusLock.lock(); if (TaskExecutorHelper.isActive(status.get())) { suspended = new CompletableFuture(); + workflowContext.instance().status(WorkflowStatus.SUSPENDED); + publishEvent( + workflowContext, + l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext))); return true; } else { return false; @@ -197,11 +201,11 @@ public boolean resume() { publishEvent( workflowContext, l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask))); - publishEvent( - workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext))); } else { suspended = null; } + publishEvent( + workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext))); return true; } else { return false; @@ -216,12 +220,8 @@ public CompletableFuture completedChecks(TaskContext t) { statusLock.lock(); if (suspended != null) { suspendedTask = t; - workflowContext.instance().status(WorkflowStatus.SUSPENDED); publishEvent( workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t))); - publishEvent( - workflowContext, - l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext))); return suspended; } if (cancelled != null) { diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index d6088a2a..c4223aea 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -115,9 +115,18 @@ void testSuspendResumeNotWait() .instance(Map.of()); CompletableFuture future = instance.start(); instance.suspend(); + assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); instance.resume(); assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow()) .isEqualTo(Map.of("name", "Javierito")); + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + WorkflowSuspendedCEData workflowSuspendedEvent = + assertPojoInCE( + "io.serverlessworkflow.workflow.suspended.v1", WorkflowSuspendedCEData.class); + WorkflowResumedCEData workflowResumedEvent = + assertPojoInCE("io.serverlessworkflow.workflow.resumed.v1", WorkflowResumedCEData.class); + assertThat(workflowSuspendedEvent.suspendedAt()) + .isBeforeOrEqualTo(workflowResumedEvent.resumedAt()); } @Test @@ -127,10 +136,10 @@ void testSuspendResumeWait() appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("wait-set.yaml")) .instance(Map.of()); CompletableFuture future = instance.start(); - instance.suspend(); assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); - Thread.sleep(550); + instance.suspend(); assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED); + Thread.sleep(550); instance.resume(); assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow()) .isEqualTo(Map.of("name", "Javierito")); From d361b0271da5a57eddf24360937b4bb052ba3782 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 28 Aug 2025 09:58:46 +0200 Subject: [PATCH 2/5] [Fix #744] Add api dependency to test and example Although already included because jackson-impl depends on it, it is a good practise to explicitly add API as dependency because this dependency is an implementation detail and the usage of the reader API by the exampple is not. Signed-off-by: Francisco Javier Tirado Sarti --- examples/events/pom.xml | 4 ++++ examples/simpleGet/pom.xml | 8 ++++++-- impl/test/pom.xml | 4 ++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/examples/events/pom.xml b/examples/events/pom.xml index e1461497..15678a1a 100644 --- a/examples/events/pom.xml +++ b/examples/events/pom.xml @@ -15,6 +15,10 @@ + + io.serverlessworkflow + serverlessworkflow-api + io.serverlessworkflow serverlessworkflow-impl-jackson diff --git a/examples/simpleGet/pom.xml b/examples/simpleGet/pom.xml index f8d853b3..656dacec 100644 --- a/examples/simpleGet/pom.xml +++ b/examples/simpleGet/pom.xml @@ -14,6 +14,10 @@ + + io.serverlessworkflow + serverlessworkflow-api + io.serverlessworkflow serverlessworkflow-impl-jackson @@ -23,8 +27,8 @@ serverlessworkflow-impl-http - org.glassfish.jersey.media - jersey-media-json-jackson + org.glassfish.jersey.media + jersey-media-json-jackson org.glassfish.jersey.core diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 4a6a82b3..33a81db5 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -12,6 +12,10 @@ io.serverlessworkflow serverlessworkflow-impl-jackson + + io.serverlessworkflow + serverlessworkflow-api + io.serverlessworkflow serverlessworkflow-impl-http From 6d23850c79011f7687638a43f77c7072736c64b6 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 28 Aug 2025 10:27:30 +0200 Subject: [PATCH 3/5] [Fix #744] Disabling event life cycle publishing In some scenarios, users wont need the cloud events to be published. This allows user to disable publishing at application level. Signed-off-by: Francisco Javier Tirado Sarti --- .../impl/WorkflowApplication.java | 12 ++ .../ce/AbstractLifeCyclePublisher.java | 152 ++++++++++++------ .../events/JacksonLifeCyclePublisher.java | 73 +-------- .../impl/test/EventDefinitionTest.java | 2 +- 4 files changed, 113 insertions(+), 126 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index e1cb5838..e87871b4 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -59,6 +59,7 @@ public class WorkflowApplication implements AutoCloseable { private final RuntimeDescriptorFactory runtimeDescriptorFactory; private final EventConsumer eventConsumer; private final EventPublisher eventPublisher; + private final boolean lifeCycleCEPublishingEnabled; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -73,6 +74,7 @@ private WorkflowApplication(Builder builder) { this.definitions = new ConcurrentHashMap<>(); this.eventConsumer = builder.eventConsumer; this.eventPublisher = builder.eventPublisher; + this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled; } public TaskExecutorFactory taskFactory() { @@ -145,6 +147,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private EventPublisher eventPublisher; private RuntimeDescriptorFactory descriptorFactory = () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); + private boolean lifeCycleCEPublishingEnabled = true; private Builder() {} @@ -168,6 +171,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) { return this; } + public Builder disableLifeCycleCEPublishing() { + this.lifeCycleCEPublishingEnabled = false; + return this; + } + public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) { this.executorFactory = executorFactory; return this; @@ -278,4 +286,8 @@ public EventConsumer eventConsumer() { public ExecutorService executorService() { return executorFactory.get(); } + + public boolean isLifeCycleCEPublishingEnabled() { + return lifeCycleCEPublishingEnabled; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index fee07613..2686b9d5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -18,13 +18,14 @@ import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref; import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowErrorCEData.error; +import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.data.PojoCloudEventData; import io.cloudevents.core.data.PojoCloudEventData.ToBytes; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.events.CloudEventUtils; -import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskEvent; @@ -41,13 +42,15 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.OffsetDateTime; +import java.util.function.Function; public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener { @Override - public void onTaskStarted(TaskStartedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskStarted(TaskStartedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -58,9 +61,10 @@ public void onTaskStarted(TaskStartedEvent ev) { } @Override - public void onTaskCompleted(TaskCompletedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskCompleted(TaskCompletedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -72,9 +76,10 @@ public void onTaskCompleted(TaskCompletedEvent ev) { } @Override - public void onTaskSuspended(TaskSuspendedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskSuspended(TaskSuspendedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -85,9 +90,10 @@ public void onTaskSuspended(TaskSuspendedEvent ev) { } @Override - public void onTaskResumed(TaskResumedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskResumed(TaskResumedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -98,9 +104,10 @@ public void onTaskResumed(TaskResumedEvent ev) { } @Override - public void onTaskCancelled(TaskCancelledEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskCancelled(TaskCancelledEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -111,9 +118,10 @@ public void onTaskCancelled(TaskCancelledEvent ev) { } @Override - public void onTaskFailed(TaskFailedEvent ev) { - eventPublisher(ev) - .publish( + public void onTaskFailed(TaskFailedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -124,9 +132,10 @@ public void onTaskFailed(TaskFailedEvent ev) { } @Override - public void onWorkflowStarted(WorkflowStartedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowStarted(WorkflowStartedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -136,9 +145,10 @@ public void onWorkflowStarted(WorkflowStartedEvent ev) { } @Override - public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowSuspended(WorkflowSuspendedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -149,9 +159,10 @@ public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { } @Override - public void onWorkflowCancelled(WorkflowCancelledEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowCancelled(WorkflowCancelledEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -162,9 +173,10 @@ public void onWorkflowCancelled(WorkflowCancelledEvent ev) { } @Override - public void onWorkflowResumed(WorkflowResumedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowResumed(WorkflowResumedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -174,9 +186,10 @@ public void onWorkflowResumed(WorkflowResumedEvent ev) { } @Override - public void onWorkflowCompleted(WorkflowCompletedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowCompleted(WorkflowCompletedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -187,9 +200,10 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) { } @Override - public void onWorkflowFailed(WorkflowFailedEvent ev) { - eventPublisher(ev) - .publish( + public void onWorkflowFailed(WorkflowFailedEvent event) { + publish( + event, + ev -> builder() .withData( cloudEventData( @@ -199,29 +213,65 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) { .build()); } - protected abstract byte[] convert(WorkflowStartedCEData data); + protected byte[] convert(WorkflowStartedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowSuspendedCEData data); + protected byte[] convert(WorkflowCompletedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowResumedCEData data); + protected byte[] convert(TaskStartedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowCancelledCEData data); + protected byte[] convert(TaskCompletedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(WorkflowCompletedCEData data); + protected byte[] convert(TaskFailedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskStartedCEData data); + protected byte[] convert(WorkflowFailedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskCompletedCEData data); + protected byte[] convert(WorkflowSuspendedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskFailedCEData data); + protected byte[] convert(WorkflowResumedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskSuspendedCEData data); + protected byte[] convert(WorkflowCancelledCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskCancelledCEData data); + protected byte[] convert(TaskSuspendedCEData data) { + return convertToBytes(data); + } + + protected byte[] convert(TaskCancelledCEData data) { + return convertToBytes(data); + } + + protected byte[] convert(TaskResumedCEData data) { + return convertToBytes(data); + } - protected abstract byte[] convert(TaskResumedCEData data); + protected abstract byte[] convertToBytes(T data); - protected abstract byte[] convert(WorkflowFailedCEData data); + /* By default, generated cloud events are published, if user has not disabled them at application level, + * using application event publisher. That might be changed if needed by children. + */ + protected void publish(T ev, Function ce) { + WorkflowApplication application = ev.workflowContext().definition().application(); + if (application.isLifeCycleCEPublishingEnabled()) { + application.eventPublisher().publish(ce.apply(ev)); + } + } private static CloudEventData cloudEventData(T data, ToBytes toBytes) { return PojoCloudEventData.wrap(data, toBytes); @@ -246,10 +296,6 @@ private static Object output(WorkflowEvent ev) { return from(ev.workflowContext().instanceData().output()); } - private static EventPublisher eventPublisher(WorkflowEvent ev) { - return ev.workflowContext().definition().application().eventPublisher(); - } - private static Object output(TaskEvent ev) { return from(ev.taskContext().output()); } diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java index c186c24f..66a7618a 100644 --- a/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java @@ -18,83 +18,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher; -import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskFailedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskSuspendedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData; import java.io.UncheckedIOException; public class JacksonLifeCyclePublisher extends AbstractLifeCyclePublisher { @Override - protected byte[] convert(WorkflowStartedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowCompletedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskStartedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskCompletedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskFailedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowFailedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowSuspendedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowResumedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(WorkflowCancelledCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskSuspendedCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskCancelledCEData data) { - return genericConvert(data); - } - - @Override - protected byte[] convert(TaskResumedCEData data) { - return genericConvert(data); - } - - protected byte[] genericConvert(T data) { + protected byte[] convertToBytes(T data) { try { return JsonUtils.mapper().writeValueAsBytes(data); } catch (JsonProcessingException e) { diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java index 548ba61c..d2ed4407 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java @@ -46,7 +46,7 @@ public class EventDefinitionTest { @BeforeAll static void init() { - appl = WorkflowApplication.builder().build(); + appl = WorkflowApplication.builder().disableLifeCycleCEPublishing().build(); } @ParameterizedTest From 9a1ea45aeb170b1c65abca1dcfb3712955f62913 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 28 Aug 2025 11:19:48 +0200 Subject: [PATCH 4/5] [Fix #744] Improving EventPublisher/EventConsumer There might be more that one EventPublisher EventPublishers are either added programatically or through service loader. There will be only one consumer. If user want to handler more than one consuming event broker, he should provide an specific implementation that deals with them. The one consumer is added programatically (priortiy) or default to service loader. If not eventConsumer was provides, InMemoryEvents is used as event consumer and publisher. ChatBotIT and LifeCycleEventTest has been changed accordingly. Signed-off-by: Francisco Javier Tirado Sarti --- .../fluent/agentic/ChatBotIT.java | 34 ++++----- .../impl/WorkflowApplication.java | 42 +++++++---- .../impl/events/InMemoryEvents.java | 8 ++- .../impl/executors/EmitExecutor.java | 17 +++-- .../ce/AbstractLifeCyclePublisher.java | 72 ++++++++++++++----- .../impl/test/LifeCycleEventsTest.java | 22 +++--- 6 files changed, 124 insertions(+), 71 deletions(-) diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java index 397f2183..8f696ef1 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java @@ -28,13 +28,12 @@ import dev.langchain4j.memory.chat.MessageWindowChatMemory; import io.cloudevents.CloudEvent; import io.cloudevents.core.v1.CloudEventBuilder; -import io.serverlessworkflow.api.types.EventFilter; -import io.serverlessworkflow.api.types.EventProperties; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.InMemoryEvents; import java.net.URI; import java.time.OffsetDateTime; import java.util.Map; @@ -103,25 +102,15 @@ void chat_bot() { .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished")))) .build(); - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - app.eventConsumer() - .register( - app.eventConsumer() - .listen( - new EventFilter() - .withWith(new EventProperties().withType("org.acme.chatbot.reply")), - app), - ce -> replyEvents.add((CloudEvent) ce)); - - app.eventConsumer() - .register( - app.eventConsumer() - .listen( - new EventFilter() - .withWith(new EventProperties().withType("org.acme.chatbot.finished")), - app), - ce -> finishedEvents.add((CloudEvent) ce)); + InMemoryEvents eventBroker = new InMemoryEvents(); + eventBroker.register("org.acme.chatbot.reply", ce -> replyEvents.add((CloudEvent) ce)); + eventBroker.register("org.acme.chatbot.finished", ce -> finishedEvents.add((CloudEvent) ce)); + try (WorkflowApplication app = + WorkflowApplication.builder() + .withEventConsumer(eventBroker) + .withEventPublisher(eventBroker) + .build()) { final WorkflowInstance waitingInstance = app.workflowDefinition(listenWorkflow).instance(Map.of()); final CompletableFuture runningModel = waitingInstance.start(); @@ -130,12 +119,12 @@ void chat_bot() { assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); // Publish the events - app.eventPublisher().publish(newMessageEvent("Hello World!")); + eventBroker.publish(newMessageEvent("Hello World!")); CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS); assertNotNull(reply); // Empty message completes the workflow - app.eventPublisher().publish(newMessageEvent("", "org.acme.chatbot.finalize")); + eventBroker.publish(newMessageEvent("", "org.acme.chatbot.finalize")); CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS); assertNotNull(finished); assertThat(finishedEvents).isEmpty(); @@ -145,6 +134,7 @@ void chat_bot() { } catch (InterruptedException e) { fail(e.getMessage()); + } finally { } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index e87871b4..bc70f8d2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -32,6 +32,7 @@ import io.serverlessworkflow.impl.resources.StaticResource; import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -58,7 +59,7 @@ public class WorkflowApplication implements AutoCloseable { private final ExecutorServiceFactory executorFactory; private final RuntimeDescriptorFactory runtimeDescriptorFactory; private final EventConsumer eventConsumer; - private final EventPublisher eventPublisher; + private final Collection eventPublishers; private final boolean lifeCycleCEPublishingEnabled; private WorkflowApplication(Builder builder) { @@ -73,7 +74,7 @@ private WorkflowApplication(Builder builder) { this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet(); this.definitions = new ConcurrentHashMap<>(); this.eventConsumer = builder.eventConsumer; - this.eventPublisher = builder.eventPublisher; + this.eventPublishers = builder.eventPublishers; this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled; } @@ -101,8 +102,8 @@ public Collection listeners() { return listeners; } - public EventPublisher eventPublisher() { - return eventPublisher; + public Collection eventPublishers() { + return eventPublishers; } public WorkflowIdFactory idFactory() { @@ -144,7 +145,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString(); private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory(); private EventConsumer eventConsumer; - private EventPublisher eventPublisher; + private Collection eventPublishers = new ArrayList<>(); private RuntimeDescriptorFactory descriptorFactory = () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); private boolean lifeCycleCEPublishingEnabled = true; @@ -201,10 +202,13 @@ public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) { return this; } - public Builder withEventHandler( - EventPublisher eventPublisher, EventConsumer eventConsumer) { + public Builder withEventConsumer(EventConsumer eventConsumer) { this.eventConsumer = eventConsumer; - this.eventPublisher = eventPublisher; + return this; + } + + public Builder withEventPublisher(EventPublisher eventPublisher) { + this.eventPublishers.add(eventPublisher); return this; } @@ -227,10 +231,19 @@ public WorkflowApplication build() { .findFirst() .orElseGet(() -> DefaultTaskExecutorFactory.get()); } - if (eventConsumer == null && eventPublisher == null) { - InMemoryEvents inMemory = new InMemoryEvents(executorFactory); - eventPublisher = inMemory; - eventConsumer = inMemory; + ServiceLoader.load(EventPublisher.class).forEach(e -> eventPublishers.add(e)); + if (eventConsumer == null) { + eventConsumer = + ServiceLoader.load(EventConsumer.class) + .findFirst() + .orElseGet( + () -> { + InMemoryEvents inMemory = new InMemoryEvents(executorFactory); + if (eventPublishers.isEmpty()) { + eventPublishers.add(inMemory); + } + return inMemory; + }); } return new WorkflowApplication(this); } @@ -250,8 +263,11 @@ public WorkflowDefinition workflowDefinition(Workflow workflow) { @Override public void close() { safeClose(executorFactory); - safeClose(eventPublisher); + for (EventPublisher eventPublisher : eventPublishers) { + safeClose(eventPublisher); + } safeClose(eventConsumer); + for (WorkflowDefinition definition : definitions.values()) { safeClose(definition); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java index 2bcf96ef..f2d6cba3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.events; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.DefaultExecutorServiceFactory; import io.serverlessworkflow.impl.ExecutorServiceFactory; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -29,6 +30,10 @@ */ public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher { + public InMemoryEvents() { + this(new DefaultExecutorServiceFactory()); + } + public InMemoryEvents(ExecutorServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; } @@ -40,7 +45,7 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) { private AtomicReference> allConsumerRef = new AtomicReference<>(); @Override - protected void register(String topicName, Consumer consumer) { + public void register(String topicName, Consumer consumer) { topicMap.put(topicName, consumer); } @@ -77,6 +82,7 @@ protected void unregisterFromAll() { @Override public void close() { + topicMap.clear(); serviceFactory.close(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java index 8e858335..52e081b0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java @@ -33,10 +33,13 @@ import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.events.CloudEventUtils; +import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.net.URI; import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -74,11 +77,15 @@ private EmitExecutor(EmitExecutorBuilder builder) { @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { - return workflow - .definition() - .application() - .eventPublisher() - .publish(buildCloudEvent(workflow, taskContext)) + Collection eventPublishers = + workflow.definition().application().eventPublishers(); + CloudEvent ce = buildCloudEvent(workflow, taskContext); + Collection> allCompletables = new ArrayList<>(); + for (EventPublisher eventPublisher : eventPublishers) { + allCompletables.add(eventPublisher.publish(ce)); + } + return CompletableFuture.allOf( + allCompletables.toArray(new CompletableFuture[allCompletables.size()])) .thenApply(v -> taskContext.input()); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index 2686b9d5..ade29db3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -42,10 +42,42 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.Set; import java.util.function.Function; public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener { + private static final String TASK_STARTED = "io.serverlessworkflow.task.started.v1"; + private static final String TASK_COMPLETED = "io.serverlessworkflow.task.completed.v1"; + private static final String TASK_SUSPENDED = "io.serverlessworkflow.task.suspended.v1"; + private static final String TASK_RESUMED = "io.serverlessworkflow.task.resumed.v1"; + private static final String TASK_FAULTED = "io.serverlessworkflow.task.faulted.v1"; + private static final String TASK_CANCELLED = "io.serverlessworkflow.task.cancelled.v1"; + + private static final String WORKFLOW_STARTED = "io.serverlessworkflow.workflow.started.v1"; + private static final String WORKFLOW_COMPLETED = "io.serverlessworkflow.workflow.completed.v1"; + private static final String WORKFLOW_SUSPENDED = "io.serverlessworkflow.workflow.suspended.v1"; + private static final String WORKFLOW_RESUMED = "io.serverlessworkflow.workflow.resumed.v1"; + private static final String WORKFLOW_FAULTED = "io.serverlessworkflow.workflow.faulted.v1"; + private static final String WORKFLOW_CANCELLED = "io.serverlessworkflow.workflow.cancelled.v1"; + + public static Collection getLifeCycleTypes() { + return Set.of( + TASK_STARTED, + TASK_COMPLETED, + TASK_SUSPENDED, + TASK_RESUMED, + TASK_FAULTED, + TASK_CANCELLED, + WORKFLOW_STARTED, + WORKFLOW_COMPLETED, + WORKFLOW_SUSPENDED, + WORKFLOW_RESUMED, + WORKFLOW_FAULTED, + WORKFLOW_CANCELLED); + } + @Override public void onTaskStarted(TaskStartedEvent event) { publish( @@ -56,7 +88,7 @@ public void onTaskStarted(TaskStartedEvent event) { cloudEventData( new TaskStartedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.started.v1") + .withType(TASK_STARTED) .build()); } @@ -71,7 +103,7 @@ public void onTaskCompleted(TaskCompletedEvent event) { new TaskCompletedCEData( id(ev), pos(ev), ref(ev), ev.eventDate(), output(ev)), this::convert)) - .withType("io.serverlessworkflow.task.completed.v1") + .withType(TASK_COMPLETED) .build()); } @@ -85,7 +117,7 @@ public void onTaskSuspended(TaskSuspendedEvent event) { cloudEventData( new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.suspended.v1") + .withType(TASK_SUSPENDED) .build()); } @@ -99,7 +131,7 @@ public void onTaskResumed(TaskResumedEvent event) { cloudEventData( new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.resumed.v1") + .withType(TASK_RESUMED) .build()); } @@ -113,7 +145,7 @@ public void onTaskCancelled(TaskCancelledEvent event) { cloudEventData( new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.task.cancelled.v1") + .withType(TASK_CANCELLED) .build()); } @@ -127,7 +159,7 @@ public void onTaskFailed(TaskFailedEvent event) { cloudEventData( new TaskFailedCEData(id(ev), pos(ev), ref(ev), ev.eventDate(), error(ev)), this::convert)) - .withType("io.serverlessworkflow.task.faulted.v1") + .withType(TASK_FAULTED) .build()); } @@ -140,7 +172,7 @@ public void onWorkflowStarted(WorkflowStartedEvent event) { .withData( cloudEventData( new WorkflowStartedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.started.v1") + .withType(WORKFLOW_STARTED) .build()); } @@ -154,7 +186,7 @@ public void onWorkflowSuspended(WorkflowSuspendedEvent event) { cloudEventData( new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.suspended.v1") + .withType(WORKFLOW_SUSPENDED) .build()); } @@ -168,7 +200,7 @@ public void onWorkflowCancelled(WorkflowCancelledEvent event) { cloudEventData( new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.cancelled.v1") + .withType(WORKFLOW_CANCELLED) .build()); } @@ -181,7 +213,7 @@ public void onWorkflowResumed(WorkflowResumedEvent event) { .withData( cloudEventData( new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType("io.serverlessworkflow.workflow.resumed.v1") + .withType(WORKFLOW_RESUMED) .build()); } @@ -195,7 +227,7 @@ public void onWorkflowCompleted(WorkflowCompletedEvent event) { cloudEventData( new WorkflowCompletedCEData(id(ev), ref(ev), ev.eventDate(), output(ev)), this::convert)) - .withType("io.serverlessworkflow.workflow.completed.v1") + .withType(WORKFLOW_COMPLETED) .build()); } @@ -209,7 +241,7 @@ public void onWorkflowFailed(WorkflowFailedEvent event) { cloudEventData( new WorkflowFailedCEData(id(ev), ref(ev), ev.eventDate(), error(ev)), this::convert)) - .withType("io.serverlessworkflow.workflow.faulted.v1") + .withType(WORKFLOW_FAULTED) .build()); } @@ -263,14 +295,18 @@ protected byte[] convert(TaskResumedCEData data) { protected abstract byte[] convertToBytes(T data); + protected void publish(T ev, Function ceFunction) { + WorkflowApplication appl = ev.workflowContext().definition().application(); + if (appl.isLifeCycleCEPublishingEnabled()) { + publish(appl, ceFunction.apply(ev)); + } + } + /* By default, generated cloud events are published, if user has not disabled them at application level, - * using application event publisher. That might be changed if needed by children. + * using application event publishers. That might be changed if needed by children by overriding this method */ - protected void publish(T ev, Function ce) { - WorkflowApplication application = ev.workflowContext().definition().application(); - if (application.isLifeCycleCEPublishingEnabled()) { - application.eventPublisher().publish(ce.apply(ev)); - } + protected void publish(WorkflowApplication application, CloudEvent ce) { + application.eventPublishers().forEach(p -> p.publish(ce)); } private static CloudEventData cloudEventData(T data, ToBytes toBytes) { diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index c4223aea..cc908b1f 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -26,8 +26,8 @@ import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.events.EventRegistration; -import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import io.serverlessworkflow.impl.events.InMemoryEvents; +import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher; import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData; @@ -41,7 +41,6 @@ import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -59,24 +58,23 @@ class LifeCycleEventsTest { private WorkflowApplication appl; private Collection publishedEvents; - private Collection registrations; @BeforeEach void setup() { publishedEvents = new CopyOnWriteArrayList<>(); - appl = WorkflowApplication.builder().build(); - registrations = new ArrayList<>(); - Collection builders = appl.eventConsumer().listenToAll(appl); - - for (EventRegistrationBuilder builder : builders) { - registrations.add( - appl.eventConsumer().register(builder, ce -> publishedEvents.add((CloudEvent) ce))); + InMemoryEvents eventBroker = new InMemoryEvents(); + for (String type : AbstractLifeCyclePublisher.getLifeCycleTypes()) { + eventBroker.register(type, ce -> publishedEvents.add(ce)); } + appl = + WorkflowApplication.builder() + .withEventConsumer(eventBroker) + .withEventPublisher(eventBroker) + .build(); } @AfterEach void close() { - registrations.forEach(r -> appl.eventConsumer().unregister(r)); appl.close(); } From 14657830c2f5b765e045d2d6f841a06160bf12f5 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 28 Aug 2025 11:44:37 +0200 Subject: [PATCH 5/5] [Fix #744] EmitExecutor shorter version Signed-off-by: Francisco Javier Tirado Sarti --- .../serverlessworkflow/impl/executors/EmitExecutor.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java index 52e081b0..5c9d10ba 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java @@ -38,7 +38,6 @@ import io.serverlessworkflow.impl.resources.ResourceLoader; import java.net.URI; import java.time.OffsetDateTime; -import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -80,12 +79,10 @@ protected CompletableFuture internalExecute( Collection eventPublishers = workflow.definition().application().eventPublishers(); CloudEvent ce = buildCloudEvent(workflow, taskContext); - Collection> allCompletables = new ArrayList<>(); - for (EventPublisher eventPublisher : eventPublishers) { - allCompletables.add(eventPublisher.publish(ce)); - } return CompletableFuture.allOf( - allCompletables.toArray(new CompletableFuture[allCompletables.size()])) + eventPublishers.stream() + .map(eventPublisher -> eventPublisher.publish(ce)) + .toArray(size -> new CompletableFuture[size])) .thenApply(v -> taskContext.input()); }