-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlimited_workflow.exs
153 lines (126 loc) · 4.08 KB
/
limited_workflow.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
defmodule Examples.LimitedWorkflow do
@moduledoc """
This example demonstrates how to use execution limits in AgentForge.
It shows:
1. How to set timeout limits
2. How to collect and analyze execution statistics
3. How to handle timeouts gracefully
"""
alias AgentForge.{Signal, Flow}
def run do
IO.puts("=== Running Limited Workflow Example ===\n")
# Simple example with timeout
run_with_timeout()
# Example collecting statistics
run_with_statistics()
# Example with long-running handler that will timeout
run_with_timeout_error()
end
defp run_with_timeout do
IO.puts("\n--- Basic Example with Timeout ---")
# Define a simple handler
handler = fn signal, state ->
IO.puts("Processing signal: #{signal.type} -> #{inspect(signal.data)}")
# Simulate some work
Process.sleep(100)
{{:emit, Signal.new(:processed, signal.data)}, state}
end
# Create signal and process with a generous timeout
signal = Signal.new(:task, "Sample data")
{:ok, result, _state} =
Flow.process_with_limits(
[handler],
signal,
%{},
# 5 second timeout
timeout_ms: 5000
)
IO.puts("Result: #{result.type} -> #{inspect(result.data)}")
end
defp run_with_statistics do
IO.puts("\n--- Example with Statistics Collection ---")
# Define handlers that we'll track statistics for
handlers = [
# First handler - validate data
fn signal, state ->
IO.puts("Validating data...")
# Simulate validation
Process.sleep(50)
{{:emit, Signal.new(:validated, signal.data)}, state}
end,
# Second handler - transform data
fn signal, state ->
IO.puts("Transforming data...")
# Simulate transformation
Process.sleep(100)
{{:emit, Signal.new(:transformed, "#{signal.data} (transformed)")}, state}
end,
# Third handler - finalize
fn signal, state ->
IO.puts("Finalizing...")
# Simulate finalization
Process.sleep(75)
{{:emit, Signal.new(:completed, signal.data)}, state}
end
]
# Create signal and process with statistics
signal = Signal.new(:input, "Test data")
{:ok, result, _state, stats} =
Flow.process_with_limits(
handlers,
signal,
%{},
timeout_ms: 5000,
# Return stats in the result
return_stats: true
)
IO.puts("Result: #{result.type} -> #{inspect(result.data)}")
IO.puts("\nExecution Statistics:")
IO.puts("- Total steps: #{stats.steps}")
IO.puts("- Elapsed time: #{stats.elapsed_ms}ms")
IO.puts("- Completed: #{stats.complete}")
end
defp run_with_timeout_error do
IO.puts("\n--- Example with Timeout Error ---")
# Define a handler that will take too long
slow_handler = fn signal, state ->
IO.puts("Starting long process...")
# This will exceed our timeout
Process.sleep(2000)
{{:emit, Signal.new(:done, signal.data)}, state}
end
signal = Signal.new(:task, "Important data")
# Process with a short timeout - this should timeout
result =
Flow.process_with_limits(
[slow_handler],
signal,
%{},
# Only 500ms timeout
timeout_ms: 500
)
case result do
{:error, error_message, _state} ->
IO.puts("Error handled gracefully: #{error_message}")
other ->
IO.puts("Unexpected result: #{inspect(other)}")
end
# We can still retrieve the execution stats afterwards
stats = Flow.get_last_execution_stats()
if stats do
IO.puts("\nTimeout Statistics:")
IO.puts("- Elapsed time: #{stats.elapsed_ms}ms")
IO.puts("- Completed: #{stats.complete}")
else
IO.puts("\nNo statistics available")
end
end
end
# Run the example when this file is executed directly
if Code.ensure_loaded?(IEx) && IEx.started?() do
# Running in IEx, let the user decide when to run
IO.puts("Run Examples.LimitedWorkflow.run() to execute the example")
else
# Running as a script, execute immediately
Examples.LimitedWorkflow.run()
end