diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index f589938f..73fb74dc 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -30,6 +30,7 @@ import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; import io.serverlessworkflow.impl.resources.StaticResource; +import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler; import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; import java.util.ArrayList; @@ -59,6 +60,7 @@ public class WorkflowApplication implements AutoCloseable { private final Collection eventPublishers; private final boolean lifeCycleCEPublishingEnabled; private final WorkflowModelFactory modelFactory; + private final WorkflowScheduler scheduler; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -75,6 +77,7 @@ private WorkflowApplication(Builder builder) { this.eventPublishers = builder.eventPublishers; this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled; this.modelFactory = builder.modelFactory; + this.scheduler = builder.scheduler; } public TaskExecutorFactory taskFactory() { @@ -142,6 +145,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private SchemaValidatorFactory schemaValidatorFactory; private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); private WorkflowInstanceIdFactory idFactory; + private WorkflowScheduler scheduler; private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory(); private EventConsumer eventConsumer; private Collection eventPublishers = new ArrayList<>(); @@ -167,6 +171,11 @@ public Builder withExpressionFactory(ExpressionFactory factory) { return this; } + public Builder withScheduler(WorkflowScheduler scheduler) { + this.scheduler = scheduler; + return this; + } + public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) { this.resourceLoaderFactory = resourceLoader; return this; @@ -257,6 +266,9 @@ public WorkflowApplication build() { if (idFactory == null) { idFactory = new MonotonicUlidWorkflowInstanceIdFactory(); } + if (scheduler == null) { + scheduler = new DefaultWorkflowScheduler(); + } return new WorkflowApplication(this); } } @@ -313,4 +325,8 @@ public ExecutorService executorService() { public boolean isLifeCycleCEPublishingEnabled() { return lifeCycleCEPublishingEnabled; } + + public WorkflowScheduler scheduler() { + return scheduler; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index b79d86bb..01a61c94 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -16,13 +16,18 @@ package io.serverlessworkflow.impl; import static io.serverlessworkflow.impl.WorkflowUtils.*; +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; import io.serverlessworkflow.api.types.Input; +import io.serverlessworkflow.api.types.ListenTo; import io.serverlessworkflow.api.types.Output; +import io.serverlessworkflow.api.types.Schedule; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; import io.serverlessworkflow.impl.executors.TaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.resources.ResourceLoader; +import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer; import io.serverlessworkflow.impl.schema.SchemaValidator; import java.nio.file.Path; import java.util.HashMap; @@ -40,20 +45,23 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData private final TaskExecutor taskExecutor; private final ResourceLoader resourceLoader; private final Map> executors = new HashMap<>(); + private ScheduledEventConsumer scheculedConsumer; private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { this.workflow = workflow; this.application = application; this.resourceLoader = resourceLoader; - if (workflow.getInput() != null) { - Input input = workflow.getInput(); + + Input input = workflow.getInput(); + if (input != null) { this.inputSchemaValidator = getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema()); this.inputFilter = buildWorkflowFilter(application, input.getFrom()); } - if (workflow.getOutput() != null) { - Output output = workflow.getOutput(); + + Output output = workflow.getOutput(); + if (output != null) { this.outputSchemaValidator = getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema()); this.outputFilter = buildWorkflowFilter(application, output.getAs()); @@ -68,8 +76,23 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) } static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, Path path) { - return new WorkflowDefinition( - application, workflow, application.resourceLoaderFactory().getResourceLoader(path)); + WorkflowDefinition definition = + new WorkflowDefinition( + application, workflow, application.resourceLoaderFactory().getResourceLoader(path)); + Schedule schedule = workflow.getSchedule(); + if (schedule != null) { + ListenTo to = schedule.getOn(); + if (to != null) { + definition.scheculedConsumer = + application + .scheduler() + .eventConsumer( + definition, + application.modelFactory()::from, + EventRegistrationBuilderInfo.from(application, to, x -> null)); + } + } + return definition; } public WorkflowInstance instance(Object input) { @@ -121,5 +144,7 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor ta } @Override - public void close() {} + public void close() { + safeClose(scheculedConsumer); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java new file mode 100644 index 00000000..84f5b913 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer; +import java.util.Collection; +import java.util.function.Function; + +public interface WorkflowScheduler { + Collection scheduledInstances(WorkflowDefinition def); + + ScheduledEventConsumer eventConsumer( + WorkflowDefinition definition, + Function converter, + EventRegistrationBuilderInfo info); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderCollection.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderCollection.java new file mode 100644 index 00000000..501fa7df --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderCollection.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import java.util.Collection; + +public record EventRegistrationBuilderCollection( + Collection registrations, boolean isAnd) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderInfo.java new file mode 100644 index 00000000..77a41396 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderInfo.java @@ -0,0 +1,106 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.serverlessworkflow.api.types.AllEventConsumptionStrategy; +import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; +import io.serverlessworkflow.api.types.EventConsumptionStrategy; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.ListenTo; +import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; +import io.serverlessworkflow.api.types.Until; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowPredicate; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +public record EventRegistrationBuilderInfo( + EventRegistrationBuilderCollection registrations, + EventRegistrationBuilderCollection untilRegistrations, + WorkflowPredicate until) { + + public static EventRegistrationBuilderInfo from( + WorkflowApplication application, + ListenTo to, + Function predBuilder) { + EventRegistrationBuilderCollection registrations; + EventRegistrationBuilderCollection untilRegistrations = null; + WorkflowPredicate until = null; + if (to.getAllEventConsumptionStrategy() != null) { + registrations = allEvents(to.getAllEventConsumptionStrategy(), application); + } else if (to.getAnyEventConsumptionStrategy() != null) { + AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy(); + registrations = anyEvents(any, application); + Until untilDesc = any.getUntil(); + if (untilDesc != null) { + until = predBuilder.apply(untilDesc); + if (until == null) { + if (untilDesc.getAnyEventUntilConsumed() != null) { + EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed(); + if (strategy.getAllEventConsumptionStrategy() != null) { + untilRegistrations = + allEvents(strategy.getAllEventConsumptionStrategy(), application); + } else if (strategy.getAnyEventConsumptionStrategy() != null) { + untilRegistrations = + anyEvents(strategy.getAnyEventConsumptionStrategy(), application); + } else if (strategy.getOneEventConsumptionStrategy() != null) { + untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy(), application); + } + } + } + } + } else { + registrations = oneEvent(to.getOneEventConsumptionStrategy(), application); + } + return new EventRegistrationBuilderInfo(registrations, untilRegistrations, until); + } + + private static EventRegistrationBuilderCollection allEvents( + AllEventConsumptionStrategy allStrategy, WorkflowApplication application) { + return new EventRegistrationBuilderCollection(from(allStrategy.getAll(), application), true); + } + + private static EventRegistrationBuilderCollection anyEvents( + AnyEventConsumptionStrategy anyStrategy, WorkflowApplication application) { + List eventFilters = anyStrategy.getAny(); + return new EventRegistrationBuilderCollection( + eventFilters.isEmpty() ? registerToAll(application) : from(eventFilters, application), + false); + } + + private static EventRegistrationBuilderCollection oneEvent( + OneEventConsumptionStrategy oneStrategy, WorkflowApplication application) { + return new EventRegistrationBuilderCollection( + List.of(from(oneStrategy.getOne(), application)), true); + } + + private static Collection registerToAll( + WorkflowApplication application) { + return application.eventConsumer().listenToAll(application); + } + + private static Collection from( + List filters, WorkflowApplication application) { + return filters.stream().map(filter -> from(filter, application)).collect(Collectors.toList()); + } + + private static EventRegistrationBuilder from( + EventFilter filter, WorkflowApplication application) { + return application.eventConsumer().listen(filter, application); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java new file mode 100644 index 00000000..665fb924 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +public record EventRegistrationInfo( + CompletableFuture completableFuture, Collection registrations) { + + public static final EventRegistrationInfo build( + EventRegistrationBuilderCollection builderInfo, + BiConsumer> consumer, + EventConsumer eventConsumer) { + Collection registrations = new ArrayList(); + CompletableFuture[] futures = + builderInfo.registrations().stream() + .map(reg -> toCompletable(reg, registrations, consumer, eventConsumer)) + .toArray(size -> new CompletableFuture[size]); + return new EventRegistrationInfo( + builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures), + registrations); + } + + private static final CompletableFuture toCompletable( + EventRegistrationBuilder regBuilder, + Collection registrations, + BiConsumer> ceConsumer, + EventConsumer eventConsumer) { + final CompletableFuture future = new CompletableFuture<>(); + registrations.add( + eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future))); + return future; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java index 5fb29901..8d242f31 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -16,15 +16,9 @@ package io.serverlessworkflow.impl.executors; import io.cloudevents.CloudEvent; -import io.serverlessworkflow.api.types.AllEventConsumptionStrategy; -import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; -import io.serverlessworkflow.api.types.EventConsumptionStrategy; -import io.serverlessworkflow.api.types.EventFilter; import io.serverlessworkflow.api.types.ListenTask; import io.serverlessworkflow.api.types.ListenTaskConfiguration; import io.serverlessworkflow.api.types.ListenTaskConfiguration.ListenAndReadAs; -import io.serverlessworkflow.api.types.ListenTo; -import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; import io.serverlessworkflow.api.types.SubscriptionIterator; import io.serverlessworkflow.api.types.Until; import io.serverlessworkflow.impl.TaskContext; @@ -38,79 +32,34 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.events.EventConsumer; -import io.serverlessworkflow.impl.events.EventRegistration; -import io.serverlessworkflow.impl.events.EventRegistrationBuilder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderCollection; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import io.serverlessworkflow.impl.events.EventRegistrationInfo; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.stream.Collectors; public abstract class ListenExecutor extends RegularTaskExecutor { - protected final EventRegistrationBuilderCollection regBuilders; + protected final EventRegistrationBuilderInfo builderRegistrationInfo; protected final Optional> loop; protected final Function converter; protected final EventConsumer eventConsumer; - private static record EventRegistrationBuilderCollection( - Collection registrations, boolean isAnd) {} - public static class ListenExecutorBuilder extends RegularTaskExecutorBuilder { - private EventRegistrationBuilderCollection registrations; - private WorkflowPredicate until; - private EventRegistrationBuilderCollection untilRegistrations; + private EventRegistrationBuilderInfo registrationInfo; private TaskExecutor loop; private Function converter = ce -> application.modelFactory().from(ce.getData()); - private EventRegistrationBuilderCollection allEvents(AllEventConsumptionStrategy allStrategy) { - return new EventRegistrationBuilderCollection(from(allStrategy.getAll()), true); - } - - private EventRegistrationBuilderCollection anyEvents(AnyEventConsumptionStrategy anyStrategy) { - List eventFilters = anyStrategy.getAny(); - return new EventRegistrationBuilderCollection( - eventFilters.isEmpty() ? registerToAll() : from(eventFilters), false); - } - - private EventRegistrationBuilderCollection oneEvent(OneEventConsumptionStrategy oneStrategy) { - return new EventRegistrationBuilderCollection(List.of(from(oneStrategy.getOne())), true); - } - protected ListenExecutorBuilder( WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) { super(position, task, definition); ListenTaskConfiguration listen = task.getListen(); - ListenTo to = listen.getTo(); - if (to.getAllEventConsumptionStrategy() != null) { - registrations = allEvents(to.getAllEventConsumptionStrategy()); - } else if (to.getAnyEventConsumptionStrategy() != null) { - AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy(); - registrations = anyEvents(any); - Until untilDesc = any.getUntil(); - if (untilDesc != null) { - until = buildUntilPredicate(untilDesc); - if (until == null) { - if (untilDesc.getAnyEventUntilConsumed() != null) { - EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed(); - if (strategy.getAllEventConsumptionStrategy() != null) { - untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy()); - } else if (strategy.getAnyEventConsumptionStrategy() != null) { - untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy()); - } else if (strategy.getOneEventConsumptionStrategy() != null) { - untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy()); - } - } - } - } - } else if (to.getOneEventConsumptionStrategy() != null) { - registrations = oneEvent(to.getOneEventConsumptionStrategy()); - } + registrationInfo = + EventRegistrationBuilderInfo.from(application, listen.getTo(), this::buildUntilPredicate); SubscriptionIterator forEach = task.getForeach(); if (forEach != null) { loop = TaskExecutorHelper.createExecutorList(position, forEach.getDo(), definition); @@ -134,21 +83,11 @@ protected WorkflowPredicate buildUntilPredicate(Until until) { : null; } - private Collection registerToAll() { - return application.eventConsumer().listenToAll(application); - } - - private Collection from(List filters) { - return filters.stream().map(this::from).collect(Collectors.toList()); - } - - private EventRegistrationBuilder from(EventFilter filter) { - return application.eventConsumer().listen(filter, application); - } - @Override public ListenExecutor buildInstance() { - return registrations.isAnd() ? new AndListenExecutor(this) : new OrListenExecutor(this); + return registrationInfo.registrations().isAnd() + ? new AndListenExecutor(this) + : new OrListenExecutor(this); } } @@ -176,28 +115,27 @@ public static class OrListenExecutor extends ListenExecutor { public OrListenExecutor(ListenExecutorBuilder builder) { super(builder); - this.until = Optional.ofNullable(builder.until); - this.untilRegBuilders = builder.untilRegistrations; + this.until = Optional.ofNullable(builder.registrationInfo.until()); + this.untilRegBuilders = builder.registrationInfo.untilRegistrations(); } @Override - protected CompletableFuture buildFuture( - EventRegistrationBuilderCollection regCollection, - Collection registrations, + protected EventRegistrationInfo buildInfo( BiConsumer> consumer) { - CompletableFuture combinedFuture = - super.buildFuture(regCollection, registrations, consumer); + EventRegistrationInfo info = super.buildInfo(consumer); if (untilRegBuilders != null) { - Collection untilRegistrations = new ArrayList<>(); - CompletableFuture untilFuture = - combine(untilRegBuilders, untilRegistrations, (ce, f) -> f.complete(null)); - untilFuture.thenAccept( - v -> { - combinedFuture.complete(null); - untilRegistrations.forEach(reg -> eventConsumer.unregister(reg)); - }); + EventRegistrationInfo untilInfo = + EventRegistrationInfo.build( + untilRegBuilders, (ce, f) -> f.complete(null), eventConsumer); + untilInfo + .completableFuture() + .thenAccept( + v -> { + info.completableFuture().complete(null); + untilInfo.registrations().forEach(reg -> eventConsumer.unregister(reg)); + }); } - return combinedFuture; + return info; } protected void internalProcessCe( @@ -228,49 +166,24 @@ protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { WorkflowModelCollection output = workflow.definition().application().modelFactory().createCollection(); - Collection registrations = new ArrayList<>(); ((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING); - return buildFuture( - regBuilders, - registrations, + EventRegistrationInfo info = + buildInfo( (BiConsumer>) ((ce, future) -> - processCe(converter.apply(ce), output, workflow, taskContext, future))) + processCe(converter.apply(ce), output, workflow, taskContext, future))); + return info.completableFuture() .thenApply( v -> { - registrations.forEach(reg -> eventConsumer.unregister(reg)); + info.registrations().forEach(eventConsumer::unregister); return output; }); } - protected CompletableFuture buildFuture( - EventRegistrationBuilderCollection regCollection, - Collection registrations, + protected EventRegistrationInfo buildInfo( BiConsumer> consumer) { - return combine(regCollection, registrations, consumer); - } - - protected final CompletableFuture combine( - EventRegistrationBuilderCollection regCollection, - Collection registrations, - BiConsumer> consumer) { - CompletableFuture[] futures = - regCollection.registrations().stream() - .map(reg -> toCompletable(reg, registrations, consumer)) - .toArray(size -> new CompletableFuture[size]); - return regCollection.isAnd() - ? CompletableFuture.allOf(futures) - : CompletableFuture.anyOf(futures); - } - - private CompletableFuture toCompletable( - EventRegistrationBuilder regBuilder, - Collection registrations, - BiConsumer> ceConsumer) { - final CompletableFuture future = new CompletableFuture<>(); - registrations.add( - eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future))); - return future; + return EventRegistrationInfo.build( + builderRegistrationInfo.registrations(), consumer, eventConsumer); } private void processCe( @@ -299,7 +212,7 @@ private void processCe( protected ListenExecutor(ListenExecutorBuilder builder) { super(builder); this.eventConsumer = builder.application.eventConsumer(); - this.regBuilders = builder.registrations; + this.builderRegistrationInfo = builder.registrationInfo; this.loop = Optional.ofNullable(builder.loop); this.converter = builder.converter; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java new file mode 100644 index 00000000..5e3338e3 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowScheduler; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +public class DefaultWorkflowScheduler implements WorkflowScheduler { + + private Map> instances = + new ConcurrentHashMap<>(); + + @Override + public Collection scheduledInstances(WorkflowDefinition definition) { + return Collections.unmodifiableCollection(theInstances(definition)); + } + + @Override + public ScheduledEventConsumer eventConsumer( + WorkflowDefinition definition, + Function converter, + EventRegistrationBuilderInfo builderInfo) { + return new ScheduledEventConsumer(definition, converter, builderInfo) { + @Override + protected void addScheduledInstance(WorkflowInstance instance) { + theInstances(definition).add(instance); + } + }; + } + + private Collection theInstances(WorkflowDefinition definition) { + return instances.computeIfAbsent(definition, def -> new ArrayList<>()); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java new file mode 100644 index 00000000..34746355 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java @@ -0,0 +1,126 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import io.serverlessworkflow.impl.events.EventConsumer; +import io.serverlessworkflow.impl.events.EventRegistration; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public abstract class ScheduledEventConsumer implements AutoCloseable { + + private final Function converter; + private final WorkflowDefinition definition; + private final EventRegistrationBuilderInfo builderInfo; + private final EventConsumer eventConsumer; + private Map> correlatedEvents; + private Collection registrations = new ArrayList<>(); + + protected ScheduledEventConsumer( + WorkflowDefinition definition, + Function converter, + EventRegistrationBuilderInfo builderInfo) { + this.definition = definition; + this.converter = converter; + this.builderInfo = builderInfo; + this.eventConsumer = definition.application().eventConsumer(); + if (builderInfo.registrations().isAnd() + && builderInfo.registrations().registrations().size() > 1) { + this.correlatedEvents = new HashMap<>(); + builderInfo + .registrations() + .registrations() + .forEach( + reg -> { + correlatedEvents.put(reg, new ArrayList<>()); + registrations.add( + eventConsumer.register(reg, ce -> consumeEvent(reg, (CloudEvent) ce))); + }); + } else { + builderInfo + .registrations() + .registrations() + .forEach( + reg -> registrations.add(eventConsumer.register(reg, ce -> start((CloudEvent) ce)))); + } + } + + private void consumeEvent(EventRegistrationBuilder reg, CloudEvent ce) { + Collection> collections = new ArrayList<>(); + // to minimize the critical section, conversion is done later, here we are + // performing + // just collection, if any + synchronized (correlatedEvents) { + correlatedEvents.get(reg).add((CloudEvent) ce); + while (satisfyCondition()) { + Collection collection = new ArrayList<>(); + for (List values : correlatedEvents.values()) { + collection.add(values.remove(0)); + } + collections.add(collection); + } + } + // convert and start outside synchronized + collections.forEach(this::start); + } + + private boolean satisfyCondition() { + for (List values : correlatedEvents.values()) { + if (values.isEmpty()) { + return false; + } + } + return true; + } + + protected void start(CloudEvent ce) { + WorkflowModelCollection model = definition.application().modelFactory().createCollection(); + model.add(converter.apply(ce)); + start(model); + } + + protected void start(Collection ces) { + WorkflowModelCollection model = definition.application().modelFactory().createCollection(); + ces.forEach(ce -> model.add(converter.apply(ce))); + start(model); + } + + private void start(WorkflowModel model) { + WorkflowInstance instance = definition.instance(model); + addScheduledInstance(instance); + instance.start(); + } + + public void close() { + if (correlatedEvents != null) { + correlatedEvents.clear(); + } + registrations.forEach(eventConsumer::unregister); + } + + protected abstract void addScheduledInstance(WorkflowInstance instace); +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java new file mode 100644 index 00000000..a04e7184 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ScheduleEventConsumerTest { + + private WorkflowApplication appl; + + @BeforeEach + void init() throws IOException { + appl = WorkflowApplication.builder().build(); + } + + @AfterEach + void tearDown() throws IOException { + appl.close(); + } + + @Test + void testAllEvent() throws IOException, InterruptedException, ExecutionException { + + WorkflowDefinition definition = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/listen-start-all.yaml")); + Collection instances = appl.scheduler().scheduledInstances(definition); + appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito")))); + appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); + await() + .pollDelay(Duration.ofMillis(20)) + .atMost(Duration.ofMillis(500)) + .until( + () -> + instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count() + == 1); + assertThat((Collection) assertThat(instances).singleElement().actual().output().asJavaObject()) + .containsExactlyInAnyOrder("Javierito", "Fulanito"); + } + + @Test + void testOneEvent() throws IOException, InterruptedException, ExecutionException { + WorkflowDefinition definition = + appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml")); + appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito")))); + appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); + Collection instances = appl.scheduler().scheduledInstances(definition); + await() + .pollDelay(Duration.ofMillis(20)) + .atMost(Duration.ofMillis(500)) + .until( + () -> + instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count() + == 2); + List outputs = instances.stream().map(i -> i.output().asJavaObject()).toList(); + assertThat(outputs) + .containsExactlyInAnyOrder( + Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito")); + } + + @Test + void testTogether() throws IOException, InterruptedException, ExecutionException { + WorkflowDefinition oneDef = + appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml")); + WorkflowDefinition allDef = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/listen-start-all.yaml")); + appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito")))); + appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); + Collection oneDefInstances = appl.scheduler().scheduledInstances(oneDef); + Collection allDefInstances = appl.scheduler().scheduledInstances(allDef); + await() + .pollDelay(Duration.ofMillis(20)) + .atMost(Duration.ofMillis(500)) + .until( + () -> + oneDefInstances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count() + == 2 + && allDefInstances.stream() + .filter(i -> i.status() == WorkflowStatus.COMPLETED) + .count() + == 1); + + List outputs = oneDefInstances.stream().map(i -> i.output().asJavaObject()).toList(); + assertThat(outputs) + .containsExactlyInAnyOrder( + Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito")); + assertThat( + (Collection) + assertThat(allDefInstances).singleElement().actual().output().asJavaObject()) + .containsExactlyInAnyOrder("Javierito", "Fulanito"); + } + + private static int idCounter; + + private static CloudEvent buildCloudEvent(Object data) { + return CloudEventBuilder.v1() + .withId(Integer.toString(++idCounter)) + .withType("com.example.hospital.events.patients.recover") + .withSource(URI.create("http://www.fakejavieritotest.com")) + .withData(JsonCloudEventData.wrap(JsonUtils.fromValue(data))) + .build(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java index 8a47c72c..40e24cfb 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java @@ -37,12 +37,12 @@ public class TraceExecutionListener implements WorkflowExecutionListener { private static final Logger logger = LoggerFactory.getLogger(TraceExecutionListener.class); public void onWorkflowStarted(WorkflowStartedEvent ev) { - logger.info( - "Workflow definition {} with id {} started at {}", + "Workflow definition {} with id {} started at {} with data {}", ev.workflowContext().definition().workflow().getDocument().getName(), ev.workflowContext().instanceData().id(), - ev.eventDate()); + ev.eventDate(), + ev.workflowContext().instanceData().input()); } public void onWorkflowResumed(WorkflowResumedEvent ev) { diff --git a/impl/test/src/test/resources/workflows-samples/listen-start-all.yaml b/impl/test/src/test/resources/workflows-samples/listen-start-all.yaml new file mode 100644 index 00000000..77a44548 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/listen-start-all.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.1' + namespace: test + name: event-driven-schedule-all + version: '0.1.0' +schedule: + on: + all: + - with: + type: com.example.hospital.events.patients.recover + data: ${.name == "Javierito"} + - with: + type: com.example.hospital.events.patients.recover + data: ${.name == "Fulanito"} +do: + - recovered: + set: ${[$workflow.input[]|.data.name]} + \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/listen-start.yaml b/impl/test/src/test/resources/workflows-samples/listen-start.yaml new file mode 100644 index 00000000..58df7536 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/listen-start.yaml @@ -0,0 +1,15 @@ +document: + dsl: '1.0.1' + namespace: test + name: event-driven-schedule + version: '0.1.0' +schedule: + on: + one: + with: + type: com.example.hospital.events.patients.recover +do: + - recovered: + set: + recovered: ${ $workflow.input[0].data.name} + \ No newline at end of file