From be1f2142a9c3b749ad80c51ab2562f6544fc5531 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Tue, 29 Jul 2025 18:46:33 -0400 Subject: [PATCH] Add more unit tests and docs for fluent Signed-off-by: Ricardo Zanini --- experimental/lambda-fluent/pom.xml | 32 +++ experimental/lambda/pom.xml | 3 +- experimental/pom.xml | 3 +- experimental/types/pom.xml | 2 +- fluent/README.md | 204 ++++++++++++++++++ .../fluent/agentic/AgentDslWorkflowTest.java | 138 ++++++++++++ .../agentic/AgentWorkflowBuilderTest.java | 50 +++++ fluent/func/pom.xml | 4 - 8 files changed, 429 insertions(+), 7 deletions(-) create mode 100644 experimental/lambda-fluent/pom.xml create mode 100644 fluent/README.md create mode 100644 fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentDslWorkflowTest.java diff --git a/experimental/lambda-fluent/pom.xml b/experimental/lambda-fluent/pom.xml new file mode 100644 index 00000000..a537b36c --- /dev/null +++ b/experimental/lambda-fluent/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-experimental + 8.0.0-SNAPSHOT + + + serverlessworkflow-lambda-fluent + Serverless Workflow :: Experimental :: Lambda Fluent + pom + + + 17 + 17 + UTF-8 + + + + + io.serverlessworkflow + serverlessworkflow-fluent-func + + + io.serverlessworkflow + serverlessworkflow-experimental-lambda + + + \ No newline at end of file diff --git a/experimental/lambda/pom.xml b/experimental/lambda/pom.xml index 242917d1..0b57ab0a 100644 --- a/experimental/lambda/pom.xml +++ b/experimental/lambda/pom.xml @@ -7,7 +7,7 @@ 8.0.0-SNAPSHOT serverlessworkflow-experimental-lambda - ServelessWorkflow:: Experimental:: lambda + Serverless Workflow :: Experimental :: Lambda io.serverlessworkflow @@ -20,6 +20,7 @@ io.serverlessworkflow serverlessworkflow-fluent-func + test org.junit.jupiter diff --git a/experimental/pom.xml b/experimental/pom.xml index 3312207e..c4b61781 100644 --- a/experimental/pom.xml +++ b/experimental/pom.xml @@ -7,7 +7,7 @@ serverlessworkflow-experimental pom - ServerlessWorkflow:: Experimental + Serverless Workflow :: Experimental @@ -40,5 +40,6 @@ types lambda + lambda-fluent \ No newline at end of file diff --git a/experimental/types/pom.xml b/experimental/types/pom.xml index dea3931d..3165f28d 100644 --- a/experimental/types/pom.xml +++ b/experimental/types/pom.xml @@ -6,7 +6,7 @@ 8.0.0-SNAPSHOT serverlessworkflow-experimental-types - ServelessWorkflow:: Experimental:: Types + Serverless Workflow :: Experimental:: Types io.serverlessworkflow diff --git a/fluent/README.md b/fluent/README.md new file mode 100644 index 00000000..6e4d7bb0 --- /dev/null +++ b/fluent/README.md @@ -0,0 +1,204 @@ +# CNCF Serverless Workflow SDK Java — Fluent DSL + +> A programmatic, type‑safe Java API for building and running Serverless Workflows (and agentic workflows) without writing YAML. + +--- + +## 📦 Modules + +| Module | Purpose | +| -------------- | --------------------------------------------------------------------------------------------- | +| **spec** | Core DSL implementing the [Serverless Workflow Specification](https://github.com/serverlessworkflow/specification). Purely compliant fluent API. | +| **func** | Java‑centric “functional” DSL on top of **spec**: adds `Function<>`/`Predicate<>` support, `callFn` for Java method calls, and richer flow controls. | +| **agentic** | **Experimental** proof‑of‑concept DSL built on **func** for LangChain4j agentic workflows: `agent`, `sequence`, `loop`, `parallel`, etc. | + +--- + +## 🔧 Getting Started + +Add the modules you need to your Maven `pom.xml` (replace versions as appropriate): + +```xml + + + io.serverlessworkflow + serverlessworkflow-fluent-spec + ${version.io.serverlessworkflow} + + + io.serverlessworkflow + serverlessworkflow-fluent-func + ${version.io.serverlessworkflow} + + + io.serverlessworkflow + serverlessworkflow-fluent-agentic + ${version.io.serverlessworkflow} + +``` + +--- + +## 📖 Module Reference + +### 1. Spec Fluent + +Fully compliant with the CNCF Serverless Workflow spec.\ +Use it when you want a 1:1 mapping of the YAML DSL in Java. + +```java +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; + +Workflow wf = WorkflowBuilder + .workflow("flowDo") + .tasks(tasks -> + tasks + .set("initCtx", "$.foo = 'bar'") + .forEach("item", f -> f + .each("item") + .at("$.list") + ) + ) + .build(); +``` + +> [!NOTE] +> We rename reserved keywords (`for`, `do`, `if`, `while`, `switch`, `try`) to safe identifiers (`forEach`, `tasks`, `when`, etc.). + +--- + +### 2. Func Fluent + +A Java‑first DSL that builds on **spec**, adding: + +- `callFn`: invoke arbitrary Java `Function<>` handlers +- `Predicate<>` **guards** via `when(Predicate)` +- Built‑in `Function`/`Predicate` support instead of JQ expressions + +```java +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; + +Workflow wf = FuncWorkflowBuilder + .workflow("callJavaFlow") + .tasks(tasks -> + tasks.callFn("invokeHandler", call -> call + // e.g. call.className("com.acme.Handler") + // .method("handle") + // .arg("key", "value") + .function(ctx -> { + // your code here + }) + ) + ) + .build(); +``` + +> [!WARNING] +> The **func** DSL is *not* spec‑compliant. It adds Java‑specific tasks and control‑flow extensions for in‑JVM execution. + +--- + +### 3. Agentic Fluent *(Experimental)* + +Built on **func** for LangChain4j agentic workflows. Adds: + +- `agent(instance)`: invoke a LangChain4j agent +- `sequence(...)`: run agents in order +- `loop(cfg)`: retry or repeated agent calls +- `parallel(...)`: fork agent calls concurrently + +```java +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.agentic.AgentWorkflowBuilder; + +var scorer = AgentsUtils.newMovieExpert(); +var editor = AgentsUtils.newMovieExpert(); + +Workflow wf = AgentWorkflowBuilder + .workflow("retryFlow") + .tasks(tasks -> tasks.loop( + "reviewLoop", + loop -> loop + .maxIterations(5) + .exitCondition(c -> c.readState("score", 0).doubleValue() > 0.75) + .subAgents("reviewer", scorer, editor) + )) + .build(); +``` + +--- + +## 🚀 Real‑World Example: Order Fulfillment + +```java +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.agentic.AgentWorkflowBuilder; +import java.util.function.Predicate; + +public class OrderFulfillment { + + static class InventoryAgent { /* … */ } + static class NotificationAgent { /* … */ } + static class ShippingAgent { /* … */ } + + public Workflow buildWorkflow() { + + Predicate inventoryOk = state -> + Boolean.TRUE.equals(((java.util.Map) state).get("inventoryAvailable")); + + return AgentWorkflowBuilder + .workflow("OrderFulfillment") + .tasks(tasks -> tasks + + // 1. initialize state + .set("init", s -> s.expr("$.orderId = '.input.oriderId'")) + + // 2. check inventory + .agent("checkInventory", new InventoryAgent()) + + // 3. pull result into a flag + .set("inventoryAvailable", s -> s.expr("$.checkInventory.available")) + + // 4. retry until in stock (max 3 attempts) + .loop("retryIfOutOfStock", loop -> loop + .maxIterations(3) + .exitCondition(inventoryOk) + .subAgents("inventoryChecker", new InventoryAgent()) + ) + + // 5. notify systems in parallel + .parallel("notifyAll", + new NotificationAgent(), + new ShippingAgent() + ) + + // 6. mark order complete + .set("complete", s -> s.expr("$.status = 'COMPLETED'")) + ) + .build(); + } +} +``` + +--- + +## 🛠️ Next Steps & Roadmap + +- **Error handling**: retries, back‑off, `onError` handlers +- **Timers & delays**: `wait`, per‑task `timeout` +- **Sub‑workflows** & composition: call one workflow from another +- **Event tasks**: `onEvent`, `sendEvent` +- **Human‑in‑the‑Loop**: approval/notification steps + +Contributions welcome! Check out our [CONTRIBUTING.md](../CONTRIBUTING.md) and join the CNCF Slack channel for **Serverless Workflow**. + +--- + +## 📜 License + +Apache 2.0 © Serverless Workflow Authors diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentDslWorkflowTest.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentDslWorkflowTest.java new file mode 100644 index 00000000..84a348b6 --- /dev/null +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentDslWorkflowTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.api.types.func.CallTaskJava; +import io.serverlessworkflow.api.types.func.ForTaskFunction; +import java.util.List; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class AgentDslWorkflowTest { + + @Test + @DisplayName("Sequential agents via DSL.sequence(...)") + void dslSequentialAgents() { + var a1 = AgentsUtils.newMovieExpert(); + var a2 = AgentsUtils.newMovieExpert(); + var a3 = AgentsUtils.newMovieExpert(); + + Workflow wf = + AgentWorkflowBuilder.workflow("seqFlow") + .tasks(tasks -> tasks.sequence("process", a1, a2, a3)) + .build(); + + List items = wf.getDo(); + assertThat(items).hasSize(3); + // names should be process-0, process-1, process-2 + assertThat(items.get(0).getName()).isEqualTo("process-0"); + assertThat(items.get(1).getName()).isEqualTo("process-1"); + assertThat(items.get(2).getName()).isEqualTo("process-2"); + // each is a CallTaskJava under the hood + items.forEach(it -> assertThat(it.getTask().getCallTask()).isInstanceOf(CallTaskJava.class)); + } + + @Test + @DisplayName("Bare Java‑bean call via DSL.callFn(...)") + void dslCallFnBare() { + Workflow wf = + AgentWorkflowBuilder.workflow("beanCall") + .tasks(tasks -> tasks.callFn("plainCall", fn -> fn.function(ctx -> "pong"))) + .build(); + + List items = wf.getDo(); + assertThat(items).hasSize(1); + TaskItem ti = items.get(0); + assertThat(ti.getName()).isEqualTo("plainCall"); + assertThat(ti.getTask().getCallTask()).isInstanceOf(CallTaskJava.class); + } + + @Test + void dslLoopAgents() { + var scorer = AgentsUtils.newMovieExpert(); // re‑using MovieExpert as a stand‑in for scoring + var editor = AgentsUtils.newMovieExpert(); // likewise + + Workflow wf = + AgentWorkflowBuilder.workflow("retryFlow") + .tasks( + tasks -> + tasks.loop( + "reviewLoop", + loop -> + loop.maxIterations(5) + .exitCondition(c -> c.readState("score", 0).doubleValue() > 0.75) + .subAgents("reviewer", scorer, editor))) + .build(); + + List items = wf.getDo(); + assertThat(items).hasSize(1); + + var fn = (ForTaskFunction) items.get(0).getTask().getForTask(); + assertThat(fn.getDo()).isNotNull(); + assertThat(fn.getDo()).hasSize(2); + fn.getDo() + .forEach(si -> assertThat(si.getTask().getCallTask()).isInstanceOf(CallTaskJava.class)); + } + + @Test + void dslParallelAgents() { + var a1 = AgentsUtils.newMovieExpert(); + var a2 = AgentsUtils.newMovieExpert(); + + Workflow wf = + AgentWorkflowBuilder.workflow("forkFlow") + .tasks(tasks -> tasks.parallel("fanout", a1, a2)) + .build(); + + List items = wf.getDo(); + assertThat(items).hasSize(1); + + var fork = items.get(0).getTask().getForkTask(); + // two branches created + assertThat(fork.getFork().getBranches()).hasSize(2); + // branch names follow "branch-{index}-{name}" + assertThat(fork.getFork().getBranches().get(0).getName()).isEqualTo("branch-0-fanout"); + assertThat(fork.getFork().getBranches().get(1).getName()).isEqualTo("branch-1-fanout"); + } + + @Test + void dslMixSpecAndAgent() { + var agent = AgentsUtils.newMovieExpert(); + + Workflow wf = + AgentWorkflowBuilder.workflow("mixedFlow") + .tasks( + tasks -> + tasks + .set("init", s -> s.expr("$.initialized = true")) + .agent("callAgent", agent) + .set("done", "$.status = 'OK'")) + .build(); + + List items = wf.getDo(); + assertThat(items).hasSize(3); + // init is a SetTask + assertThat(items.get(0).getTask().getSetTask()).isNotNull(); + // agent call becomes a CallTaskJava + assertThat(items.get(1).getTask().getCallTask()).isInstanceOf(CallTaskJava.class); + // done is another SetTask + assertThat(items.get(2).getTask().getSetTask()).isNotNull(); + } +} diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentWorkflowBuilderTest.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentWorkflowBuilderTest.java index e34652c5..20a91c26 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentWorkflowBuilderTest.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/AgentWorkflowBuilderTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.spy; import dev.langchain4j.agentic.AgentServices; @@ -170,4 +171,53 @@ void parallelAgents() { assertThat(branch.getTask().getCallTask().get()).isInstanceOf(CallJava.class); }); } + + @Test + @DisplayName("workflow callFn(name,cfg) produces CallJava with no guard") + void testWorkflowCallFnBare() { + Workflow wf = + AgentWorkflowBuilder.workflow() + .tasks(d -> d.callFn("myCall", fn -> fn.function(ctx -> "hello"))) + .build(); + + assertThat(wf.getDo()).hasSize(1); + TaskItem ti = wf.getDo().get(0); + + assertInstanceOf(CallJava.class, ti.getTask().getCallTask().get()); + } + + @Test + @DisplayName("workflow callFn with Java DSL guard attaches predicate") + void testWorkflowCallFnWithPredicate() { + Predicate guard = cog -> true; + + Workflow wf = + AgentWorkflowBuilder.workflow() + .tasks(d -> d.callFn("guarded", fn -> fn.function(ctx -> "x").when(guard))) + .build(); + + TaskItem ti = wf.getDo().get(0); + assertInstanceOf(CallJava.class, ti.getTask().getCallTask().get()); + } + + @Test + @DisplayName("workflow loop with maxIterations only generates collection and no predicate") + void testWorkflowLoopMaxIterationsOnly() { + Agents.MovieExpert expert = AgentsUtils.newMovieExpert(); + + Workflow wf = + AgentWorkflowBuilder.workflow("maxFlow") + .tasks(d -> d.loop("limit", l -> l.maxIterations(2).subAgents("sub", expert))) + .build(); + + TaskItem ti = wf.getDo().get(0); + ForTaskFunction fn = (ForTaskFunction) ti.getTask().getForTask(); + + // synthetic collection is created + assertThat(fn.getCollection()).isNotNull(); + // no exitCondition → no whilePredicate set + assertNull(fn.getWhilePredicate(), "No while predicate when only maxIterations"); + // inner subAgents block still generates exactly one call branch + assertThat(fn.getDo()).hasSize(1); + } } diff --git a/fluent/func/pom.xml b/fluent/func/pom.xml index d1230b34..9f67dcce 100644 --- a/fluent/func/pom.xml +++ b/fluent/func/pom.xml @@ -23,10 +23,6 @@ io.serverlessworkflow serverlessworkflow-experimental-types - - io.serverlessworkflow - serverlessworkflow-types - io.serverlessworkflow serverlessworkflow-fluent-spec