diff --git a/cagent-schema.json b/cagent-schema.json index fd977411..82946cde 100644 --- a/cagent-schema.json +++ b/cagent-schema.json @@ -38,6 +38,13 @@ "metadata": { "$ref": "#/definitions/Metadata", "description": "Configuration metadata" + }, + "workflow": { + "type": "array", + "description": "Sequential workflow steps", + "items": { + "$ref": "#/definitions/WorkflowStep" + } } }, "additionalProperties": false, @@ -526,6 +533,44 @@ "cmd" ], "additionalProperties": false + }, + "WorkflowStep": { + "type": "object", + "description": "A single step in a workflow", + "properties": { + "type": { + "type": "string", + "description": "Type of workflow step", + "enum": ["agent", "parallel"] + }, + "name": { + "type": "string", + "description": "Name of the agent to run (for type 'agent')" + }, + "steps": { + "type": "array", + "description": "List of agent names to run in parallel (for type 'parallel')", + "items": { + "type": "string" + } + } + }, + "required": ["type"], + "additionalProperties": false, + "oneOf": [ + { + "properties": { + "type": { "const": "agent" } + }, + "required": ["name"] + }, + { + "properties": { + "type": { "const": "parallel" } + }, + "required": ["steps"] + } + ] } } } \ No newline at end of file diff --git a/cmd/root/run.go b/cmd/root/run.go index 7d2fc27c..4f9f7f6d 100644 --- a/cmd/root/run.go +++ b/cmd/root/run.go @@ -23,6 +23,7 @@ import ( "github.com/docker/cagent/pkg/app" "github.com/docker/cagent/pkg/chat" "github.com/docker/cagent/pkg/config" + v2 "github.com/docker/cagent/pkg/config/v2" "github.com/docker/cagent/pkg/content" "github.com/docker/cagent/pkg/evaluation" "github.com/docker/cagent/pkg/remote" @@ -32,6 +33,7 @@ import ( "github.com/docker/cagent/pkg/teamloader" "github.com/docker/cagent/pkg/telemetry" "github.com/docker/cagent/pkg/tui" + "github.com/docker/cagent/pkg/workflow" ) var ( @@ -205,6 +207,26 @@ func doRunCommand(ctx context.Context, args []string, exec bool) error { slog.Error("Failed to stop tool sets", "error", err) } }() + + // Check if this is a workflow by loading the config + dir := filepath.Dir(agentFilename) + fs, err := os.OpenRoot(dir) + if err == nil { + defer fs.Close() + cfg, err := config.LoadConfig(filepath.Base(agentFilename), fs) + if err == nil && len(cfg.Workflow) > 0 { + // This is a workflow - check if TUI should be used + slog.Info("Detected workflow configuration", "steps", len(cfg.Workflow)) + + // For exec mode or --tui=false, run without TUI + if exec || !useTUI { + return runWorkflowCLI(ctx, cfg, agents) + } + + // Default: use TUI for workflow + return runWorkflowTUI(ctx, agentFilename, cfg, agents) + } + } } else { // For remote runtime, just store the original agent filename // The remote server will handle agent loading @@ -882,3 +904,170 @@ func printAvailableCommands(agentName string, cmds map[string]string) { fmt.Printf(" - %s: %s\n", n, cmds[n]) } } + +// runWorkflowCLI executes a sequential workflow in CLI mode (--tui=false) +func runWorkflowCLI(ctx context.Context, cfg *v2.Config, agents *team.Team) error { + executor := workflow.New(cfg, agents) + + // Create an events channel + events := make(chan runtime.Event, 128) + + // Run the workflow in a goroutine + go func() { + defer close(events) + if err := executor.Execute(ctx, events); err != nil { + events <- runtime.Error(err.Error()) + } + }() + + var lastErr error + currentAgent := "" + llmIsTyping := false + + for event := range events { + switch e := event.(type) { + case *runtime.WorkflowStepStartedEvent: + if llmIsTyping { + fmt.Println() + llmIsTyping = false + } + currentAgent = e.AgentName + fmt.Printf("\n%s\n", bold(fmt.Sprintf("⏳ Step %d: Running agent '%s'...", e.StepIndex+1, e.AgentName))) + + case *runtime.WorkflowStepCompletedEvent: + if llmIsTyping { + fmt.Println() + llmIsTyping = false + } + fmt.Printf("%s\n", bold(fmt.Sprintf("βœ… Step %d completed", e.StepIndex+1))) + + case *runtime.WorkflowStepFailedEvent: + if llmIsTyping { + fmt.Println() + llmIsTyping = false + } + fmt.Printf("%s\n", red("❌ Step %d failed: %s", e.StepIndex+1, e.Error)) + lastErr = fmt.Errorf("workflow step %d failed: %s", e.StepIndex+1, e.Error) + + case *runtime.WorkflowCompletedEvent: + if llmIsTyping { + fmt.Println() + llmIsTyping = false + } + fmt.Printf("\n%s\n", bold("πŸŽ‰ Workflow completed successfully!")) + fmt.Printf("\n%s\n", bold("Final Output:")) + fmt.Printf("%s\n", e.FinalOutput) + + case *runtime.AgentChoiceEvent: + if e.AgentName != currentAgent { + if llmIsTyping { + fmt.Println() + } + currentAgent = e.AgentName + printAgentName(e.AgentName) + llmIsTyping = false + } + if !llmIsTyping { + fmt.Println() + llmIsTyping = true + } + fmt.Print(e.Content) + + case *runtime.ToolCallEvent: + if llmIsTyping { + fmt.Println() + llmIsTyping = false + } + printToolCall(e.ToolCall) + + case *runtime.ToolCallResponseEvent: + if llmIsTyping { + fmt.Println() + llmIsTyping = false + } + printToolCallResponse(e.ToolCall, e.Response) + + case *runtime.ErrorEvent: + if llmIsTyping { + fmt.Println() + llmIsTyping = false + } + lastErr = fmt.Errorf("%s", e.Error) + printError(lastErr) + } + } + + if llmIsTyping { + fmt.Println() + } + + if lastErr != nil { + return lastErr + } + + return nil +} + +// runWorkflowTUI executes a sequential workflow in TUI mode (default) +func runWorkflowTUI(ctx context.Context, agentFilename string, cfg *v2.Config, agents *team.Team) error { + // Create a dummy session for workflow tracking + sess := session.New() + sess.Title = "Workflow: " + filepath.Base(agentFilename) + + // Get the first workflow step's agent for runtime (just for app compatibility) + var firstAgentName string + if len(cfg.Workflow) > 0 { + firstAgentName = cfg.Workflow[0].Name + } else { + return fmt.Errorf("no workflow steps defined") + } + + // Create a minimal runtime (we won't actually use it for execution, just for app compatibility) + rt, err := runtime.New(agents, + runtime.WithCurrentAgent(firstAgentName), + runtime.WithSessionCompaction(false), + ) + if err != nil { + return fmt.Errorf("failed to create runtime: %w", err) + } + + // Create the app which will handle event forwarding to TUI + a := app.New("cagent", agentFilename, rt, agents, sess, nil) + m := tui.New(a) + + progOpts := []tea.ProgramOption{ + tea.WithAltScreen(), + tea.WithContext(ctx), + tea.WithFilter(tui.MouseEventFilter), + tea.WithMouseCellMotion(), + tea.WithMouseAllMotion(), + } + + p := tea.NewProgram(m, progOpts...) + + // Start the event subscription in the background + go a.Subscribe(ctx, p) + + // Run the workflow executor and forward events to the app's event channel + go func() { + executor := workflow.New(cfg, agents) + events := make(chan runtime.Event, 128) + + // Execute workflow + go func() { + defer close(events) + if err := executor.Execute(ctx, events); err != nil { + events <- runtime.Error(err.Error()) + } + }() + + // Forward all workflow events to the app's event channel (which TUI will receive) + for event := range events { + a.SendEvent(event) + } + }() + + // Run the TUI + _, err = p.Run() + return err +} diff --git a/examples/README.md b/examples/README.md index c385a72b..cb143daf 100644 --- a/examples/README.md +++ b/examples/README.md @@ -44,6 +44,27 @@ These are more advanced examples, most of them involve some sort of MCP server t | [couchbase_agent.yaml](couchbase_agent.yaml) | Run Database commands using MCP tools | | | | | | docker,[couchbase](https://hub.docker.com/mcp/server/couchbase/overview) | | | [notion-expert.yaml](notion-expert.yaml) | Notion documentation expert using OAuth | | | | | | [notion](https://mcp.notion.com) (uses OAuth) | | +## **Workflow Configurations** + +These examples demonstrate workflow execution where multiple agents are chained together in sequential or parallel patterns. Workflows support both sequential execution (one agent after another) and parallel execution (multiple agents running concurrently). Workflows don't require a root agent - they execute agents in the order defined in the workflow section. + +See [WORKFLOW_README.md](WORKFLOW_README.md) for detailed documentation. + +| Name | Description/Purpose | Steps | Execution Pattern | Models Used | +|------------------------------------------------------------------|-------------------------------------------|-------|-------------------|-------------| +| [story_workflow.yaml](story_workflow.yaml) | Creative writing workflow | 3 | Sequential | OpenAI GPT-4o | +| [product_description_workflow.yaml](product_description_workflow.yaml) | Marketing content generation | 3 | Sequential | OpenAI GPT-4o | +| [joke_workflow.yaml](joke_workflow.yaml) | Joke creation, translation, and formatting | 3 | Sequential | OpenAI GPT-4o | +| [parallel_translation_workflow.yaml](parallel_translation_workflow.yaml) | Multi-language translation in parallel | 3 (1+3+1) | Mixed (Sequential + Parallel) | OpenAI GPT-4o | +| [parallel_sorting_workflow.yaml](parallel_sorting_workflow.yaml) | Compare sorting algorithms concurrently | 3 (1+4+1) | Mixed (Sequential + Parallel) | OpenAI GPT-4o | + +**How workflows work:** +- **Sequential**: Agents execute one after another in order defined +- **Parallel**: Multiple agents run concurrently, processing the same input +- Output from each step becomes input for the next step +- No root agent required +- Run with: `./bin/cagent run examples/story_workflow.yaml` + ## **Multi-Agent Configurations** These examples are groups of agents working together. Each of them is specialized for a given task, and usually has some tools assigned to fulfill these tasks. diff --git a/examples/WORKFLOW_README.md b/examples/WORKFLOW_README.md new file mode 100644 index 00000000..248afefc --- /dev/null +++ b/examples/WORKFLOW_README.md @@ -0,0 +1,212 @@ +# Workflow Execution + +This directory contains examples of workflow execution in cagent. Workflows allow you to chain multiple agents together in sequential or parallel execution patterns, where agents process and transform data through a defined pipeline. + +## Examples + +### Story Generation Workflow + +The `story_workflow.yaml` file demonstrates a creative writing workflow with three agents: + +1. **story_starter** - Writes the opening paragraph of a story about a robot learning to cook +2. **add_dialogue** - Continues the story by adding dialogue between the robot and a chef +3. **add_ending** - Completes the story with a satisfying conclusion + +```bash +./bin/cagent run examples/story_workflow.yaml +``` + +### Product Description Workflow + +The `product_description_workflow.yaml` file shows a marketing content workflow: + +1. **draft_writer** - Creates an initial product description for a smart water bottle +2. **make_exciting** - Rewrites the description with more engaging language +3. **add_cta** - Adds a compelling call-to-action + +```bash +./bin/cagent run examples/product_description_workflow.yaml +``` + +### Joke Workflow + +The `joke_workflow.yaml` demonstrates a simple two-step comedy workflow: + +1. **joke_writer** - Creates an original joke +2. **joke_improver** - Enhances the joke with better timing or punchline + +```bash +./bin/cagent run examples/joke_workflow.yaml +``` + +### Parallel Translation Workflow + +The `parallel_translation_workflow.yaml` demonstrates parallel execution where multiple agents process the same input concurrently: + +1. **source_text** - Generates a technical explanation of Docker containers +2. **Parallel Step** - Three translation agents run simultaneously: + - **translate_spanish** - Translates to Spanish + - **translate_french** - Translates to French + - **translate_japanese** - Translates to Japanese +3. **formatter** - Combines all translations into a formatted output + +```bash +./bin/cagent run examples/parallel_translation_workflow.yaml +``` + +### Parallel Sorting Workflow + +The `parallel_sorting_workflow.yaml` shows parallel processing with compute-intensive tasks: + +1. **generate_array** - Creates a random array of 100 integers +2. **Parallel Step** - Four sorting agents run concurrently: + - **bubble_sort** - Sorts using Bubble Sort + - **insertion_sort** - Sorts using Insertion Sort + - **merge_sort** - Sorts using Merge Sort + - **quicksort** - Sorts using QuickSort +3. **analyzer** - Compares and analyzes all sorting results + +```bash +./bin/cagent run examples/parallel_sorting_workflow.yaml +``` + +## How It Works + +The `run` command automatically detects workflows by checking if the configuration file contains a `workflow` section. No special command is needed! + +### Execution Patterns + +Workflows support two execution patterns: + +1. **Sequential (`type: agent`)** - Agents run one after another, each receiving the previous agent's output +2. **Parallel (`type: parallel`)** - Multiple agents run concurrently, all receiving the same input, with outputs combined for the next step + +## Workflow Configuration + +### Basic Structure + +#### Sequential Workflow + +```yaml +version: "2" + +agents: + agent_name: + model: openai/gpt-4o + instruction: | + Your agent instructions here + +workflow: + - type: agent + name: agent_name + - type: agent + name: next_agent +``` + +#### Parallel Workflow + +```yaml +version: "2" + +agents: + generator: + model: openai/gpt-4o + instruction: Generate initial data + + processor1: + model: openai/gpt-4o + instruction: Process data using method 1 + + processor2: + model: openai/gpt-4o + instruction: Process data using method 2 + + combiner: + model: openai/gpt-4o + instruction: Combine and analyze results + +workflow: + - type: agent + name: generator + - type: parallel + steps: + - processor1 + - processor2 + - type: agent + name: combiner +``` + +### Key Features + +1. **Sequential Execution**: Agents run in the order defined in the workflow +2. **Parallel Execution**: Multiple agents process the same input concurrently +3. **Data Piping**: The output of each step becomes the input for the next step +4. **Automatic Context**: The first agent receives instructions to generate initial content, subsequent agents receive the previous output as input +5. **Output Combination**: Parallel step outputs are concatenated in the order specified and passed to the next step +6. **No Root Agent Required**: Workflows don't need a "root" agent - just define the agents used in your workflow steps + +### Example Flows + +#### Sequential Flow + +``` +Step 1: story_starter +β†’ Output: "RoboChef-42 had never encountered a kitchen before..." + +Step 2: add_dialogue (receives previous output) +β†’ Output: "...Chef Lucia approached with curiosity..." + +Step 3: add_ending (receives previous output) +β†’ Output: "...a bright future in the culinary world." +``` + +#### Parallel Flow + +``` +Step 1: source_text +β†’ Output: "Docker containers are lightweight..." + +Step 2: Parallel execution (all receive same input) +β”œβ”€ translate_spanish β†’ "Los contenedores Docker son ligeros..." +β”œβ”€ translate_french β†’ "Les conteneurs Docker sont lΓ©gers..." +└─ translate_japanese β†’ "Dockerγ‚³γƒ³γƒ†γƒŠγ―θ»½ι‡γ§γ™..." + +Step 3: formatter (receives all parallel outputs) +β†’ Combined output with all three translations formatted +``` + +## Command Options + +Workflows support the same runtime configuration flags as regular agent runs: + +### Running without TUI (CLI mode) + +```bash +./bin/cagent run examples/story_workflow.yaml --tui=false +``` + +### Model Overrides + +Override specific agent models: + +```bash +./bin/cagent run examples/story_workflow.yaml \ + --model story_starter=anthropic/claude-sonnet-4-0 \ + --model add_dialogue=openai/gpt-4o +``` + +### Debug Mode + +```bash +./bin/cagent run examples/story_workflow.yaml --debug +``` + +## Notes + +- Each agent's output is passed as text to the next step +- For parallel steps, outputs are concatenated in the order specified in the `steps` array +- The workflow stops immediately if any agent fails +- Model overrides can be specified per agent using `--model agent_name=provider/model` +- Supports both `type: agent` (sequential) and `type: parallel` (concurrent) workflow steps +- The final output of the workflow is the output from the last step in the sequence +- Parallel execution provides true concurrency - agents run simultaneously, not sequentially diff --git a/examples/joke_workflow.yaml b/examples/joke_workflow.yaml new file mode 100644 index 00000000..b8fff13a --- /dev/null +++ b/examples/joke_workflow.yaml @@ -0,0 +1,31 @@ +#!/usr/bin/env cagent run +version: "2" + +agents: + root: + model: openai/gpt-4o + description: Joke workflow coordinator (not used in workflow execution) + instruction: This agent is not used. The workflow executes agents in sequence. + + joke_writer: + model: openai/gpt-4o + description: Writes short dad jokes + instruction: Write a short dad joke (2-3 sentences max) + + translator: + model: openai/gpt-4o + description: Translates jokes to French + instruction: Translate the joke to French, preserving the humor + + formatter: + model: openai/gpt-4o + description: Formats content as tweets + instruction: Format as a tweet with emojis (max 280 chars) + +workflow: + - type: agent + name: joke_writer + - type: agent + name: translator + - type: agent + name: formatter diff --git a/examples/parallel_sorting_workflow.yaml b/examples/parallel_sorting_workflow.yaml new file mode 100644 index 00000000..b803041b --- /dev/null +++ b/examples/parallel_sorting_workflow.yaml @@ -0,0 +1,67 @@ +version: "2" + +agents: + generate_array: + model: openai/gpt-4o + instruction: | + Generate a random array of 100 integers between 1 and 1000. + Output format: just the numbers separated by commas. + Example: 42, 789, 123, 456, ... + + bubble_sort: + model: openai/gpt-4o + instruction: | + Sort the given array using Bubble Sort algorithm. + Explain your step-by-step process as you sort. + Write out intermediate states every 10 swaps. + Output the final sorted array and count how many comparisons you made. + This should take significant work - be thorough and detailed. + + insertion_sort: + model: openai/gpt-4o + instruction: | + Sort the given array using Insertion Sort algorithm. + Explain your step-by-step process. + Show intermediate states every 10 insertions. + Output the final sorted array and count how many comparisons you made. + + merge_sort: + model: openai/gpt-4o + instruction: | + Sort the given array using Merge Sort algorithm. + Briefly explain the divide-and-conquer process. + Show the merge tree structure (how you split and merged). + Output the final sorted array. + + quicksort: + model: openai/gpt-4o + instruction: | + Sort the given array using QuickSort algorithm. + Briefly explain your pivot selection and partitioning. + Output the final sorted array. + + analyzer: + model: openai/gpt-4o + instruction: | + You received the same array sorted by 4 different algorithms: + Bubble Sort, Insertion Sort, Merge Sort, and QuickSort. + + Create a summary report: + 1. Verify all 4 produced the same sorted result + 2. Compare the complexity of explanations (which algorithm explained more steps) + 3. Note which algorithms mentioned their time complexity + 4. Provide a final "winner" based on explanation clarity + + Format as a brief analysis report. + +workflow: + - type: agent + name: generate_array + - type: parallel + steps: + - bubble_sort + - insertion_sort + - merge_sort + - quicksort + - type: agent + name: analyzer diff --git a/examples/parallel_translation_workflow.yaml b/examples/parallel_translation_workflow.yaml new file mode 100644 index 00000000..938aeeda --- /dev/null +++ b/examples/parallel_translation_workflow.yaml @@ -0,0 +1,45 @@ +version: "2" + +agents: + source_text: + model: openai/gpt-4o + instruction: | + Write a 2-paragraph technical explanation of how Docker containers work. + Use simple language but be technically accurate. About 80-100 words total. + + translate_spanish: + model: openai/gpt-4o + instruction: | + Translate the text to Spanish. + Maintain technical accuracy and natural flow. + + translate_french: + model: openai/gpt-4o + instruction: | + Translate the text to French. + Maintain technical accuracy and natural flow. + + translate_japanese: + model: openai/gpt-4o + instruction: | + Translate the text to Japanese. + Maintain technical accuracy and natural flow. + Use appropriate technical terminology in Japanese. + + formatter: + model: openai/gpt-4o + instruction: | + You received the same text translated into Spanish, French, and Japanese. + Create a formatted output showing all three translations with language labels. + Format: "πŸ‡ͺπŸ‡Έ Spanish:\n[text]\n\nπŸ‡«πŸ‡· French:\n[text]\n\nπŸ‡―πŸ‡΅ Japanese:\n[text]" + +workflow: + - type: agent + name: source_text + - type: parallel + steps: + - translate_spanish + - translate_french + - translate_japanese + - type: agent + name: formatter diff --git a/examples/product_description_workflow.yaml b/examples/product_description_workflow.yaml new file mode 100644 index 00000000..b3fd6093 --- /dev/null +++ b/examples/product_description_workflow.yaml @@ -0,0 +1,28 @@ +version: "2" + +agents: + draft_writer: + model: openai/gpt-4o + instruction: | + Write a product description for a smart water bottle that tracks hydration. + Keep it to 3-4 sentences. Be informative but casual. + + make_exciting: + model: openai/gpt-4o + instruction: | + Rewrite the product description to be more exciting and engaging. + Add enthusiastic language and emphasize benefits. Keep the same length. + + add_cta: + model: openai/gpt-4o + instruction: | + Add a compelling call-to-action (1-2 sentences) at the end. + Make it action-oriented and create urgency. + +workflow: + - type: agent + name: draft_writer + - type: agent + name: make_exciting + - type: agent + name: add_cta diff --git a/examples/story_workflow.yaml b/examples/story_workflow.yaml new file mode 100644 index 00000000..cf1cdd65 --- /dev/null +++ b/examples/story_workflow.yaml @@ -0,0 +1,22 @@ +version: "2" + +agents: + story_starter: + model: openai/gpt-4o + instruction: Write the first paragraph of a short story about a robot learning to cook (3-4 sentences) + + add_dialogue: + model: openai/gpt-4o + instruction: Continue the story by adding a dialogue between the robot and a chef (3-4 sentences) + + add_ending: + model: openai/gpt-4o + instruction: Complete the story with a satisfying ending (2-3 sentences) + +workflow: + - type: agent + name: story_starter + - type: agent + name: add_dialogue + - type: agent + name: add_ending diff --git a/pkg/app/app.go b/pkg/app/app.go index f9bae3c8..fa6b193f 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -50,6 +50,12 @@ func (a *App) Title() string { return a.title } +// SendEvent sends an event to the app's event channel +// This is used by workflows to forward events to the TUI +func (a *App) SendEvent(event tea.Msg) { + a.events <- event +} + // Run one agent loop func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string) { a.cancel = cancel diff --git a/pkg/config/config.go b/pkg/config/config.go index 165c0ed4..30ddb439 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -136,6 +136,27 @@ func validateConfig(cfg *latest.Config) error { } } + // Validate workflow steps + for i, step := range cfg.Workflow { + switch step.Type { + case "agent": + if _, exists := cfg.Agents[step.Name]; !exists { + return fmt.Errorf("workflow step %d: references non-existent agent '%s'", i, step.Name) + } + case "parallel": + if len(step.Steps) == 0 { + return fmt.Errorf("workflow step %d: parallel step must specify at least one agent in 'steps'", i) + } + for _, agentName := range step.Steps { + if _, exists := cfg.Agents[agentName]; !exists { + return fmt.Errorf("workflow step %d: parallel step references non-existent agent '%s'", i, agentName) + } + } + default: + return fmt.Errorf("workflow step %d: unsupported type '%s' (supported types: 'agent', 'parallel')", i, step.Type) + } + } + return nil } diff --git a/pkg/config/examples_test.go b/pkg/config/examples_test.go index f81859c0..e4ded9cc 100644 --- a/pkg/config/examples_test.go +++ b/pkg/config/examples_test.go @@ -42,8 +42,29 @@ func TestParseExamples(t *testing.T) { require.NoError(t, err) require.Equal(t, "2", cfg.Version, "Version should be 2 in %s", file) - require.NotEmpty(t, cfg.Agents["root"].Description, "Description should not be empty in %s", file) - require.NotEmpty(t, cfg.Agents["root"].Instruction, "Instruction should not be empty in %s", file) + + // For workflow examples, verify workflow structure instead of root agent + if len(cfg.Workflow) > 0 { + require.NotEmpty(t, cfg.Workflow, "Workflow should not be empty in %s", file) + // Verify all workflow agents exist and have instructions + for _, step := range cfg.Workflow { + switch step.Type { + case "agent": + require.Contains(t, cfg.Agents, step.Name, "Workflow agent '%s' not found in %s", step.Name, file) + require.NotEmpty(t, cfg.Agents[step.Name].Instruction, "Instruction should not be empty for agent '%s' in %s", step.Name, file) + case "parallel": + require.NotEmpty(t, step.Steps, "Parallel step should have at least one agent in %s", file) + for _, agentName := range step.Steps { + require.Contains(t, cfg.Agents, agentName, "Parallel workflow agent '%s' not found in %s", agentName, file) + require.NotEmpty(t, cfg.Agents[agentName].Instruction, "Instruction should not be empty for agent '%s' in %s", agentName, file) + } + } + } + } else { + // For non-workflow examples, verify root agent + require.NotEmpty(t, cfg.Agents["root"].Description, "Description should not be empty in %s", file) + require.NotEmpty(t, cfg.Agents["root"].Instruction, "Instruction should not be empty in %s", file) + } }) } } diff --git a/pkg/config/v1/types.go b/pkg/config/v1/types.go index 939a449e..af691009 100644 --- a/pkg/config/v1/types.go +++ b/pkg/config/v1/types.go @@ -138,6 +138,7 @@ type Config struct { Models map[string]ModelConfig `json:"models,omitempty" yaml:"models,omitempty"` Env map[string]string `json:"env,omitempty" yaml:"env,omitempty"` Metadata Metadata `json:"metadata,omitempty" yaml:"metadata,omitempty"` + Workflow []WorkflowStep `json:"workflow,omitempty" yaml:"workflow,omitempty"` } type Metadata struct { @@ -145,3 +146,10 @@ type Metadata struct { License string `json:"license,omitempty" yaml:"license,omitempty"` Readme string `json:"readme,omitempty" yaml:"readme,omitempty"` } + +// WorkflowStep represents a step in a workflow +type WorkflowStep struct { + Type string `json:"type" yaml:"type"` // "agent" or "parallel" + Name string `json:"name,omitempty" yaml:"name,omitempty"` // Name of the agent to run (for type "agent") + Steps []string `json:"steps,omitempty" yaml:"steps,omitempty"` // List of agent names to run in parallel (for type "parallel") +} diff --git a/pkg/config/v2/types.go b/pkg/config/v2/types.go index 6028e921..09b448e3 100644 --- a/pkg/config/v2/types.go +++ b/pkg/config/v2/types.go @@ -10,6 +10,7 @@ type Config struct { Agents map[string]AgentConfig `json:"agents,omitempty"` Models map[string]ModelConfig `json:"models,omitempty"` Metadata Metadata `json:"metadata,omitempty"` + Workflow []WorkflowStep `json:"workflow,omitempty"` } // AgentConfig represents a single agent configuration @@ -181,3 +182,10 @@ type StructuredOutput struct { // Strict enables strict schema adherence (OpenAI only) Strict bool `json:"strict,omitempty"` } + +// WorkflowStep represents a step in a workflow +type WorkflowStep struct { + Type string `json:"type"` // "agent" or "parallel" + Name string `json:"name,omitempty"` // Name of the agent to run (for type "agent") + Steps []string `json:"steps,omitempty"` // List of agent names to run in parallel (for type "parallel") +} diff --git a/pkg/config/v2/upgrade.go b/pkg/config/v2/upgrade.go index 5c349865..cd956bfa 100644 --- a/pkg/config/v2/upgrade.go +++ b/pkg/config/v2/upgrade.go @@ -20,11 +20,10 @@ func UpgradeFrom(old v1.Config) (Config, error) { } } - for _, agent := range old.Agents { + for agentName := range old.Agents { + agent := old.Agents[agentName] for i := range agent.Toolsets { - toolSet := agent.Toolsets[i] - - if len(toolSet.Envfiles) > 0 { + if len(agent.Toolsets[i].Envfiles) > 0 { return Config{}, errors.New("toolset Envfiles is not supported anymore") } } diff --git a/pkg/runtime/event.go b/pkg/runtime/event.go index 1670c40e..1b3a88f5 100644 --- a/pkg/runtime/event.go +++ b/pkg/runtime/event.go @@ -307,3 +307,109 @@ func MaxIterationsReached(maxIterations int) Event { func (e *MaxIterationsReachedEvent) GetAgentName() string { return e.AgentName } + +// WorkflowStepStartedEvent is sent when a workflow step starts +type WorkflowStepStartedEvent struct { + Type string `json:"type"` + StepIndex int `json:"step_index"` + AgentName string `json:"agent_name"` +} + +func (e *WorkflowStepStartedEvent) GetAgentName() string { return e.AgentName } + +func WorkflowStepStarted(stepIndex int, agentName string) Event { + return &WorkflowStepStartedEvent{ + Type: "workflow_step_started", + StepIndex: stepIndex, + AgentName: agentName, + } +} + +// WorkflowStepCompletedEvent is sent when a workflow step completes successfully +type WorkflowStepCompletedEvent struct { + Type string `json:"type"` + StepIndex int `json:"step_index"` + AgentName string `json:"agent_name"` + Output string `json:"output"` +} + +func (e *WorkflowStepCompletedEvent) GetAgentName() string { return e.AgentName } + +func WorkflowStepCompleted(stepIndex int, agentName, output string) Event { + return &WorkflowStepCompletedEvent{ + Type: "workflow_step_completed", + StepIndex: stepIndex, + AgentName: agentName, + Output: output, + } +} + +// WorkflowStepFailedEvent is sent when a workflow step fails +type WorkflowStepFailedEvent struct { + Type string `json:"type"` + StepIndex int `json:"step_index"` + AgentName string `json:"agent_name"` + Error string `json:"error"` +} + +func (e *WorkflowStepFailedEvent) GetAgentName() string { return e.AgentName } + +func WorkflowStepFailed(stepIndex int, agentName, errorMsg string) Event { + return &WorkflowStepFailedEvent{ + Type: "workflow_step_failed", + StepIndex: stepIndex, + AgentName: agentName, + Error: errorMsg, + } +} + +// WorkflowCompletedEvent is sent when the entire workflow completes successfully +type WorkflowCompletedEvent struct { + Type string `json:"type"` + FinalOutput string `json:"final_output"` +} + +func (e *WorkflowCompletedEvent) GetAgentName() string { return "" } + +func WorkflowCompleted(finalOutput string) Event { + return &WorkflowCompletedEvent{ + Type: "workflow_completed", + FinalOutput: finalOutput, + } +} + +// WorkflowParallelStartedEvent is sent when a parallel workflow step starts +type WorkflowParallelStartedEvent struct { + Type string `json:"type"` + StepIndex int `json:"step_index"` + AgentNames []string `json:"agent_names"` +} + +func (e *WorkflowParallelStartedEvent) GetAgentName() string { return "" } + +func WorkflowParallelStarted(stepIndex int, agentNames []string) Event { + return &WorkflowParallelStartedEvent{ + Type: "workflow_parallel_started", + StepIndex: stepIndex, + AgentNames: agentNames, + } +} + +// WorkflowParallelCompletedEvent is sent when a parallel workflow step completes +type WorkflowParallelCompletedEvent struct { + Type string `json:"type"` + StepIndex int `json:"step_index"` + AgentNames []string `json:"agent_names"` + CombinedOutput string `json:"combined_output"` +} + +func (e *WorkflowParallelCompletedEvent) GetAgentName() string { return "" } + +func WorkflowParallelCompleted(stepIndex int, agentNames []string, combinedOutput string) Event { + return &WorkflowParallelCompletedEvent{ + Type: "workflow_parallel_completed", + StepIndex: stepIndex, + AgentNames: agentNames, + CombinedOutput: combinedOutput, + } +} diff --git a/pkg/tui/page/chat/chat.go b/pkg/tui/page/chat/chat.go index 6cba571b..cc6e2bed 100644 --- a/pkg/tui/page/chat/chat.go +++ b/pkg/tui/page/chat/chat.go @@ -268,6 +268,38 @@ func (p *chatPage) Update(msg tea.Msg) (tea.Model, tea.Cmd) { }) return p, tea.Batch(spinnerCmd, dialogCmd) + case *runtime.WorkflowStepStartedEvent: + cmd := p.messages.AddSystemMessage("Starting workflow step: " + msg.AgentName) + return p, tea.Batch(cmd, p.messages.ScrollToBottom()) + case *runtime.WorkflowStepCompletedEvent: + cmd := p.messages.AddSystemMessage("Completed workflow step: " + msg.AgentName) + return p, tea.Batch(cmd, p.messages.ScrollToBottom()) + case *runtime.WorkflowStepFailedEvent: + cmd := p.messages.AddSystemMessage("Failed workflow step: " + msg.AgentName + " - " + msg.Error) + return p, tea.Batch(cmd, p.messages.ScrollToBottom()) + case *runtime.WorkflowParallelStartedEvent: + agentList := "" + for i, name := range msg.AgentNames { + if i > 0 { + agentList += ", " + } + agentList += name + } + cmd := p.messages.AddSystemMessage("Running agents in parallel: " + agentList) + return p, tea.Batch(cmd, p.messages.ScrollToBottom()) + case *runtime.WorkflowParallelCompletedEvent: + agentList := "" + for i, name := range msg.AgentNames { + if i > 0 { + agentList += ", " + } + agentList += name + } + cmd := p.messages.AddSystemMessage("Completed parallel execution: " + agentList) + return p, tea.Batch(cmd, p.messages.ScrollToBottom()) + case *runtime.WorkflowCompletedEvent: + cmd := p.messages.AddSystemMessage("Workflow completed successfully") + return p, tea.Batch(cmd, p.messages.ScrollToBottom()) } diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go new file mode 100644 index 00000000..8cab8820 --- /dev/null +++ b/pkg/workflow/workflow.go @@ -0,0 +1,240 @@ +package workflow + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/docker/cagent/pkg/config/v2" + "github.com/docker/cagent/pkg/runtime" + "github.com/docker/cagent/pkg/session" + "github.com/docker/cagent/pkg/team" +) + +// Executor manages the execution of sequential workflows +type Executor struct { + config *v2.Config + team *team.Team +} + +// New creates a new workflow executor +func New(config *v2.Config, agents *team.Team) *Executor { + return &Executor{ + config: config, + team: agents, + } +} + +// Execute runs the workflow, supporting both sequential and parallel execution +func (e *Executor) Execute(ctx context.Context, events chan<- runtime.Event) error { + if len(e.config.Workflow) == 0 { + return fmt.Errorf("no workflow steps defined") + } + + slog.Info("Starting workflow execution", "steps", len(e.config.Workflow)) + + var previousOutput string + + for i, step := range e.config.Workflow { + switch step.Type { + case "agent": + output, err := e.executeAgent(ctx, events, i, step.Name, previousOutput, i == 0) + if err != nil { + return err + } + previousOutput = output + + case "parallel": + output, err := e.executeParallel(ctx, events, i, step.Steps, previousOutput) + if err != nil { + return err + } + previousOutput = output + + default: + return fmt.Errorf("step %d: unsupported workflow step type '%s'", i, step.Type) + } + } + + events <- runtime.WorkflowCompleted(previousOutput) + slog.Info("Workflow execution completed successfully") + return nil +} + +// executeAgent runs a single agent +func (e *Executor) executeAgent(ctx context.Context, events chan<- runtime.Event, stepIndex int, agentName, input string, isFirst bool) (string, error) { + agent := e.team.Agent(agentName) + if agent == nil { + return "", fmt.Errorf("step %d: agent '%s' not found", stepIndex, agentName) + } + + slog.Info("Executing workflow step", "step", stepIndex+1, "agent", agentName) + events <- runtime.WorkflowStepStarted(stepIndex, agentName) + + // Create a new session for this agent + var sess *session.Session + if isFirst { + // First agent - use its instruction as system message + sess = session.New( + session.WithSystemMessage(agent.Instruction()), + session.WithImplicitUserMessage("", "Generate the initial data as specified in your instructions."), + ) + } else { + // Subsequent agents - pass previous output as user message + userPrompt := fmt.Sprintf("Process the following input according to your instructions:\n\n%s", input) + sess = session.New( + session.WithSystemMessage(agent.Instruction()), + session.WithImplicitUserMessage("", userPrompt), + ) + } + sess.SendUserMessage = false + + // Create runtime for this agent + rt, err := runtime.New(e.team, + runtime.WithCurrentAgent(agentName), + runtime.WithSessionCompaction(false), + ) + if err != nil { + events <- runtime.WorkflowStepFailed(stepIndex, agentName, err.Error()) + return "", fmt.Errorf("step %d: failed to create runtime: %w", stepIndex, err) + } + + // Run the agent and collect events + for event := range rt.RunStream(ctx, sess) { + // Forward events to the caller + events <- event + + // Check for errors + if errEvent, ok := event.(*runtime.ErrorEvent); ok { + events <- runtime.WorkflowStepFailed(stepIndex, agentName, errEvent.Error) + return "", fmt.Errorf("step %d: agent '%s' failed: %s", stepIndex, agentName, errEvent.Error) + } + } + + // Get the output from the last assistant message + output := sess.GetLastAssistantMessageContent() + if output == "" { + err := fmt.Errorf("step %d: agent '%s' produced no output", stepIndex, agentName) + events <- runtime.WorkflowStepFailed(stepIndex, agentName, err.Error()) + return "", err + } + + events <- runtime.WorkflowStepCompleted(stepIndex, agentName, output) + slog.Info("Workflow step completed", "step", stepIndex+1, "agent", agentName) + return output, nil +} + +// executeParallel runs multiple agents in parallel and combines their outputs +func (e *Executor) executeParallel(ctx context.Context, events chan<- runtime.Event, stepIndex int, agentNames []string, input string) (string, error) { + if len(agentNames) == 0 { + return "", fmt.Errorf("step %d: no agents specified for parallel execution", stepIndex) + } + + slog.Info("Executing parallel workflow step", "step", stepIndex+1, "agents", agentNames) + events <- runtime.WorkflowParallelStarted(stepIndex, agentNames) + + // Create channels for collecting results + type result struct { + agentName string + output string + err error + } + results := make(chan result, len(agentNames)) + + // Create a wait group to track all parallel executions + var wg sync.WaitGroup + + // Launch all agents in parallel + for _, agentName := range agentNames { + wg.Add(1) + go func(name string) { + defer wg.Done() + + agent := e.team.Agent(name) + if agent == nil { + results <- result{agentName: name, err: fmt.Errorf("agent '%s' not found", name)} + return + } + + slog.Info("Starting parallel agent", "agent", name) + + // Create a new session for this agent + userPrompt := fmt.Sprintf("Process the following input according to your instructions:\n\n%s", input) + sess := session.New( + session.WithSystemMessage(agent.Instruction()), + session.WithImplicitUserMessage("", userPrompt), + ) + sess.SendUserMessage = false + + // Create runtime for this agent + rt, err := runtime.New(e.team, + runtime.WithCurrentAgent(name), + runtime.WithSessionCompaction(false), + ) + if err != nil { + results <- result{agentName: name, err: fmt.Errorf("failed to create runtime: %w", err)} + return + } + + // Run the agent and collect events + for event := range rt.RunStream(ctx, sess) { + // Forward events to the caller + events <- event + + // Check for errors + if errEvent, ok := event.(*runtime.ErrorEvent); ok { + results <- result{agentName: name, err: fmt.Errorf("agent failed: %s", errEvent.Error)} + return + } + } + + // Get the output from the last assistant message + output := sess.GetLastAssistantMessageContent() + if output == "" { + results <- result{agentName: name, err: fmt.Errorf("agent produced no output")} + return + } + + slog.Info("Parallel agent completed", "agent", name) + results <- result{agentName: name, output: output} + }(agentName) + } + + // Wait for all goroutines to complete + go func() { + wg.Wait() + close(results) + }() + + // Collect all results maintaining order + outputs := make(map[string]string) + var errors []string + + for res := range results { + if res.err != nil { + errors = append(errors, fmt.Sprintf("%s: %v", res.agentName, res.err)) + events <- runtime.WorkflowStepFailed(stepIndex, res.agentName, res.err.Error()) + } else { + outputs[res.agentName] = res.output + events <- runtime.WorkflowStepCompleted(stepIndex, res.agentName, res.output) + } + } + + // Check if any agents failed + if len(errors) > 0 { + return "", fmt.Errorf("step %d: parallel execution failed: %v", stepIndex, errors) + } + + // Combine outputs in the order they were specified + var combinedOutput string + for _, agentName := range agentNames { + if output, ok := outputs[agentName]; ok { + combinedOutput += output + "\n\n" + } + } + + events <- runtime.WorkflowParallelCompleted(stepIndex, agentNames, combinedOutput) + slog.Info("Parallel workflow step completed", "step", stepIndex+1) + return combinedOutput, nil +}