diff --git a/internal/step/dummy/provider.go b/internal/step/dummy/provider.go index 7bbd888e..cc99f333 100644 --- a/internal/step/dummy/provider.go +++ b/internal/step/dummy/provider.go @@ -24,6 +24,9 @@ func (p *dummyProvider) Kind() string { type dummyProvider struct { } +func (p *dummyProvider) Register(_ step.Registry) { +} + func (p *dummyProvider) ProviderSchema() map[string]*schema.PropertySchema { // We don't need any steps to set up the provider. return map[string]*schema.PropertySchema{} @@ -64,7 +67,7 @@ func (p *dummyProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { } } -func (p *dummyProvider) LoadSchema(_ map[string]any) (step.RunnableStep, error) { +func (p *dummyProvider) LoadSchema(_ map[string]any, _ map[string][]byte) (step.RunnableStep, error) { return &runnableStep{}, nil } diff --git a/internal/step/dummy/provider_test.go b/internal/step/dummy/provider_test.go index 1b22c0e3..6ac333c2 100644 --- a/internal/step/dummy/provider_test.go +++ b/internal/step/dummy/provider_test.go @@ -42,7 +42,7 @@ func (s *stageChangeHandler) OnStepComplete( func TestProvider(t *testing.T) { provider := dummy.New() assert.Equals(t, provider.Kind(), "dummy") - runnable, err := provider.LoadSchema(map[string]any{}) + runnable, err := provider.LoadSchema(map[string]any{}, map[string][]byte{}) assert.NoError(t, err) handler := &stageChangeHandler{ diff --git a/internal/step/foreach/README.md b/internal/step/foreach/README.md new file mode 100644 index 00000000..9b2dc1e1 --- /dev/null +++ b/internal/step/foreach/README.md @@ -0,0 +1,26 @@ +# Foreach step provider + +This provider allows you to loop over a list of inputs and execute (potentially parallel) workflows for each item. The subworkflows must only have one possible output named "success" and this output will be collected into a list as a result. + +## Usage + +```yaml +steps: + your_step: + kind: foreach + workflow: some_workflow_file.yaml # This must be in the workflow directory + items: !expr $.input.some_list_of_items + parallelism: 5 # How many workflows to run in parallel +output: + result: !expr $.steps.your_step.outputs.success.data # This will be a list of result objects +``` + +### Handling errors + +In case one or more subworkflows exit with an error, you can also recover. + +```yaml +output: + result: !expr $.steps.your_step.failed.error.data # This will be a map of int keys to provide the subworkflows with a successful execution. + errors: !expr $.steps.your_step.failed.error.messages # This will be a map of int to error messages for the subworkflows that failed. +``` \ No newline at end of file diff --git a/internal/step/foreach/doc.go b/internal/step/foreach/doc.go new file mode 100644 index 00000000..43705120 --- /dev/null +++ b/internal/step/foreach/doc.go @@ -0,0 +1,2 @@ +// Package foreach provides the ability to loop over items. +package foreach diff --git a/internal/step/foreach/provider.go b/internal/step/foreach/provider.go new file mode 100644 index 00000000..e06b7c31 --- /dev/null +++ b/internal/step/foreach/provider.go @@ -0,0 +1,519 @@ +package foreach + +import ( + "context" + "fmt" + "reflect" + "sync" + + "go.arcalot.io/log/v2" + "go.flow.arcalot.io/engine/config" + "go.flow.arcalot.io/engine/internal/step" + "go.flow.arcalot.io/engine/workflow" + "go.flow.arcalot.io/pluginsdk/schema" +) + +// New creates a new loop provider. +func New( + logger log.Logger, + config *config.Config, +) (step.Provider, error) { + return &forEachProvider{ + logger: logger, + config: config, + }, nil +} + +// StageID contains the identifiers for the stages. +type StageID string + +const ( + // StageIDExecute is executing the subworkflow. + StageIDExecute StageID = "execute" + // StageIDOutputs is providing the output data of the subworkflow. + StageIDOutputs StageID = "outputs" + // StageIDFailed is providing the error reason from the subworkflow. + StageIDFailed StageID = "failed" +) + +var executeLifecycleStage = step.LifecycleStage{ + ID: string(StageIDExecute), + WaitingName: "waiting for execution", + RunningName: "executing", + FinishedName: "finished", + InputFields: map[string]struct{}{ + "items": {}, + }, + NextStages: []string{ + string(StageIDOutputs), + string(StageIDFailed), + }, + Fatal: false, +} +var outputLifecycleStage = step.LifecycleStage{ + ID: string(StageIDOutputs), + WaitingName: "waiting for output", + RunningName: "output", + FinishedName: "output", + InputFields: map[string]struct{}{}, + NextStages: []string{}, + Fatal: false, +} +var errorLifecycleStage = step.LifecycleStage{ + ID: string(StageIDFailed), + WaitingName: "processing error", + RunningName: "processing error", + FinishedName: "error", + InputFields: map[string]struct{}{}, + NextStages: []string{}, + Fatal: true, +} + +type forEachProvider struct { + logger log.Logger + registry step.Registry + config *config.Config +} + +func (l *forEachProvider) Register(registry step.Registry) { + l.registry = registry +} + +func (l *forEachProvider) Kind() string { + return "foreach" +} + +func (l *forEachProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { + return step.Lifecycle[step.LifecycleStage]{ + InitialStage: "execute", + Stages: []step.LifecycleStage{ + executeLifecycleStage, + outputLifecycleStage, + errorLifecycleStage, + }, + } +} + +func (l *forEachProvider) ProviderSchema() map[string]*schema.PropertySchema { + return map[string]*schema.PropertySchema{ + "workflow": schema.NewPropertySchema( + schema.NewStringSchema( + schema.PointerTo[int64](1), + nil, + nil, + ), + schema.NewDisplayValue( + schema.PointerTo("Workflow file"), + schema.PointerTo("Workflow file within the workflow context (directory) to loop over."), + nil, + ), + true, + nil, + nil, + nil, + nil, + []string{"\"subworkflow.yaml\""}, + ), + "parallelism": schema.NewPropertySchema( + schema.NewIntSchema( + schema.PointerTo[int64](1), + nil, + nil, + ), + schema.NewDisplayValue( + schema.PointerTo("Parallelism"), + schema.PointerTo("How many subworkflows to run in parallel."), + nil, + ), + false, + nil, + nil, + nil, + schema.PointerTo("1"), + nil, + ), + } +} + +func (l *forEachProvider) RunProperties() map[string]struct{} { + return map[string]struct{}{} +} + +func (l *forEachProvider) LoadSchema(inputs map[string]any, workflowContext map[string][]byte) (step.RunnableStep, error) { + workflowFileName := inputs["workflow"] + workflowContents, ok := workflowContext[workflowFileName.(string)] + if !ok { + return nil, fmt.Errorf( + "workflow file %s not found in current workflow context (make sure the subworkflow is in the same directory as the main workflow)", + workflowFileName.(string), + ) + } + + yamlConverter := workflow.NewYAMLConverter(l.registry) + wf, err := yamlConverter.FromYAML(workflowContents) + if err != nil { + return nil, err + } + + executor, err := workflow.NewExecutor(l.logger, l.config, l.registry) + if err != nil { + return nil, err + } + + preparedWorkflow, err := executor.Prepare(wf, workflowContext) + if err != nil { + return nil, err + } + + outputSchema := preparedWorkflow.OutputSchema() + if _, ok := outputSchema["success"]; !ok { + return nil, fmt.Errorf("the referenced workflow must contain an output named 'success'") + } + if len(outputSchema) > 1 { + return nil, fmt.Errorf("the referenced workflow may only contain a 'success' output") + } + + return &runnableStep{ + workflow: preparedWorkflow, + parallelism: inputs["parallelism"].(int64), + logger: l.logger, + }, nil +} + +type runnableStep struct { + workflow workflow.ExecutableWorkflow + parallelism int64 + logger log.Logger +} + +func (r *runnableStep) Lifecycle(_ map[string]any) (step.Lifecycle[step.LifecycleStageWithSchema], error) { + workflowOutput := r.workflow.OutputSchema() + + return step.Lifecycle[step.LifecycleStageWithSchema]{ + InitialStage: "execute", + Stages: []step.LifecycleStageWithSchema{ + { + LifecycleStage: executeLifecycleStage, + InputSchema: map[string]*schema.PropertySchema{ + "items": schema.NewPropertySchema( + schema.NewListSchema( + r.workflow.Input(), + nil, + nil, + ), + schema.NewDisplayValue( + schema.PointerTo("Items"), + schema.PointerTo("Items to loop over."), + nil, + ), + true, + nil, + nil, + nil, + nil, + nil, + ), + }, + }, + { + LifecycleStage: outputLifecycleStage, + Outputs: map[string]*schema.StepOutputSchema{ + "success": schema.NewStepOutputSchema( + schema.NewScopeSchema( + schema.NewObjectSchema( + "data", + map[string]*schema.PropertySchema{ + "data": schema.NewPropertySchema( + schema.NewListSchema( + workflowOutput["success"].SchemaValue, + nil, + nil, + ), + schema.NewDisplayValue( + schema.PointerTo("Data"), + schema.PointerTo("Data returned from the subworkflows."), + nil, + ), + true, + nil, + nil, + nil, + nil, + nil, + ), + }, + ), + ), + schema.NewDisplayValue( + schema.PointerTo("Success"), + schema.PointerTo("Successful results from subworkflows."), + nil, + ), + false, + ), + }, + }, + { + LifecycleStage: errorLifecycleStage, + Outputs: map[string]*schema.StepOutputSchema{ + "error": schema.NewStepOutputSchema( + schema.NewScopeSchema( + schema.NewObjectSchema( + "error", + map[string]*schema.PropertySchema{ + "data": schema.NewPropertySchema( + schema.NewMapSchema( + schema.NewIntSchema(nil, nil, nil), + workflowOutput["success"].SchemaValue, + nil, + nil, + ), + schema.NewDisplayValue( + schema.PointerTo("Data"), + schema.PointerTo("Data returned from the subworkflows."), + nil, + ), + true, + nil, + nil, + nil, + nil, + nil, + ), + "errors": schema.NewPropertySchema( + schema.NewMapSchema( + schema.NewIntSchema(nil, nil, nil), + schema.NewStringSchema(nil, nil, nil), + nil, + nil, + ), + schema.NewDisplayValue( + schema.PointerTo("Message"), + schema.PointerTo("Error message detailing what caused the subworkflow to fail."), + nil, + ), + true, + nil, + nil, + nil, + nil, + nil, + ), + }, + ), + ), + schema.NewDisplayValue( + schema.PointerTo("Error"), + schema.PointerTo("Contains the error that happened while executing the subworkflow."), + nil, + ), + true, + ), + }, + }, + }, + }, nil +} + +func (r *runnableStep) RunSchema() map[string]*schema.PropertySchema { + return map[string]*schema.PropertySchema{} +} + +func (r *runnableStep) Start(_ map[string]any, stageChangeHandler step.StageChangeHandler) (step.RunningStep, error) { + ctx, cancel := context.WithCancel(context.Background()) + rs := &runningStep{ + ctx: ctx, + cancel: cancel, + lock: &sync.Mutex{}, + currentStage: StageIDExecute, + currentState: step.RunningStepStateStarting, + inputData: make(chan []any, 1), + workflow: r.workflow, + stageChangeHandler: stageChangeHandler, + parallelism: r.parallelism, + logger: r.logger, + } + go rs.run() + return rs, nil +} + +type runningStep struct { + workflow workflow.ExecutableWorkflow + currentStage StageID + lock *sync.Mutex + currentState step.RunningStepState + inputAvailable bool + inputData chan []any + ctx context.Context + cancel context.CancelFunc + stageChangeHandler step.StageChangeHandler + parallelism int64 + logger log.Logger +} + +func (r *runningStep) ProvideStageInput(stage string, input map[string]any) error { + r.lock.Lock() + switch stage { + case string(StageIDExecute): + items := input["items"] + v := reflect.ValueOf(items) + input := make([]any, v.Len()) + for i := 0; i < v.Len(); i++ { + item := v.Index(i).Interface() + _, err := r.workflow.Input().Unserialize(item) + if err != nil { + r.lock.Unlock() + return fmt.Errorf("invalid input item %d for subworkflow (%w)", i, err) + } + input[i] = item + } + if r.inputAvailable { + r.lock.Unlock() + return fmt.Errorf("input for execute workflow provided twice") + } + if r.currentState == step.RunningStepStateWaitingForInput && r.currentStage == StageIDExecute { + r.currentState = step.RunningStepStateRunning + } + r.inputAvailable = true + r.lock.Unlock() + r.inputData <- input + return nil + case string(StageIDOutputs): + r.lock.Unlock() + return nil + case string(StageIDFailed): + r.lock.Unlock() + return nil + default: + r.lock.Unlock() + return fmt.Errorf("invalid stage: %s", stage) + } +} + +func (r *runningStep) CurrentStage() string { + r.lock.Lock() + defer r.lock.Unlock() + return string(r.currentStage) +} + +func (r *runningStep) State() step.RunningStepState { + r.lock.Lock() + defer r.lock.Unlock() + return r.currentState +} + +func (r *runningStep) Close() error { + r.cancel() + return nil +} + +func (r *runningStep) run() { + defer close(r.inputData) + waitingForInput := false + r.lock.Lock() + if !r.inputAvailable { + r.currentState = step.RunningStepStateWaitingForInput + waitingForInput = true + } else { + r.currentState = step.RunningStepStateRunning + } + r.lock.Unlock() + r.stageChangeHandler.OnStageChange( + r, + nil, + nil, + nil, + string(StageIDExecute), + waitingForInput, + ) + select { + case loopData, ok := <-r.inputData: + if !ok { + return + } + + itemOutputs := make([]any, len(loopData)) + itemErrors := make(map[int]string, len(loopData)) + + r.logger.Debugf("Executing subworkflow...") + wg := &sync.WaitGroup{} + wg.Add(len(loopData)) + errors := false + sem := make(chan struct{}, r.parallelism) + for i, input := range loopData { + i := i + input := input + go func() { + defer func() { + <-sem + wg.Done() + }() + r.logger.Debugf("Queuing item %d...", i) + select { + case sem <- struct{}{}: + case <-r.ctx.Done(): + r.logger.Debugf("Aborting item %d execution.", i) + return + } + + r.logger.Debugf("Executing item %d...", i) + // Ignore the output ID here because it can only be "success" + _, outputData, err := r.workflow.Execute(r.ctx, input) + r.lock.Lock() + if err != nil { + errors = true + itemErrors[i] = err.Error() + } else { + itemOutputs[i] = outputData + } + r.lock.Unlock() + r.logger.Debugf("Item %d complete.", i) + }() + } + wg.Wait() + r.logger.Debugf("Subworkflow complete.") + r.lock.Lock() + previousStage := string(r.currentStage) + r.currentState = step.RunningStepStateRunning + var outputID string + var outputData any + if errors { + r.currentStage = StageIDFailed + outputID = "error" + dataMap := make(map[int]any, len(loopData)) + for i, entry := range itemOutputs { + if entry != nil { + dataMap[i] = entry + } + } + outputData = map[string]any{ + "data": dataMap, + "messages": itemErrors, + } + } else { + r.currentStage = StageIDOutputs + outputID = "success" + outputData = map[string]any{ + "data": itemOutputs, + } + } + currentStage := r.currentStage + r.lock.Unlock() + r.stageChangeHandler.OnStageChange( + r, + &previousStage, + nil, + nil, + string(currentStage), + false, + ) + r.lock.Lock() + r.currentState = step.RunningStepStateFinished + previousStage = string(r.currentStage) + r.lock.Unlock() + r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData) + case <-r.ctx.Done(): + return + } + +} diff --git a/internal/step/foreach/provider_example_test.go b/internal/step/foreach/provider_example_test.go new file mode 100644 index 00000000..9ac7a961 --- /dev/null +++ b/internal/step/foreach/provider_example_test.go @@ -0,0 +1,107 @@ +package foreach_test + +import ( + "context" + "fmt" + + "go.arcalot.io/lang" + "go.arcalot.io/log/v2" + "go.flow.arcalot.io/engine/config" + "go.flow.arcalot.io/engine/internal/step/dummy" + "go.flow.arcalot.io/engine/internal/step/foreach" + "go.flow.arcalot.io/engine/internal/step/registry" + "go.flow.arcalot.io/engine/workflow" +) + +// mainWorkflow is the workflow calling the foreach step. +var mainWorkflow = ` +input: + root: names + objects: + names: + id: names + properties: + names: + type: + type_id: list + items: + type_id: object + id: name + properties: + name: + type: + type_id: string +steps: + greet: + kind: foreach + items: !expr $.input.names + workflow: subworkflow.yaml + parallelism: 5 +output: + messages: !expr $.steps.greet.outputs.success.data +` + +var subworkflow = ` +input: + root: name + objects: + name: + id: name + properties: + name: + type: + type_id: string +steps: + say_hi: + kind: dummy + name: !expr $.input.name +output: + message: !expr $.steps.say_hi.greet.success.message +` + +// ExampleNew provides an example for using the foreach provider to run subworkflows. +func ExampleNew() { + logConfig := log.Config{ + Level: log.LevelError, + Destination: log.DestinationStdout, + } + logger := log.New( + logConfig, + ) + cfg := &config.Config{ + Log: logConfig, + } + stepRegistry := lang.Must2(registry.New( + dummy.New(), + lang.Must2(foreach.New(logger, cfg)), + )) + executor := lang.Must2(workflow.NewExecutor( + logger, + cfg, + stepRegistry, + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(mainWorkflow))) + preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{ + "subworkflow.yaml": []byte(subworkflow), + })) + outputID, outputData := lang.Must3(preparedWorkflow.Execute(context.Background(), map[string]any{ + "names": []any{ + map[string]any{ + "name": "Arca", + }, + map[string]any{ + "name": "Lot", + }, + }, + }, + )) + if outputID != "success" { + panic(fmt.Errorf("workflow run failed")) + } + data := outputData.(map[any]any)["messages"].([]any) + for _, entry := range data { + fmt.Printf("%s ", entry.(map[any]any)["message"]) + } + fmt.Println() + // Output: Hello Arca! Hello Lot! +} diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index a7130066..abb97496 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -41,6 +41,9 @@ type pluginProvider struct { logger log.Logger } +func (p *pluginProvider) Register(_ step.Registry) { +} + func (p *pluginProvider) ProviderSchema() map[string]*schema.PropertySchema { return map[string]*schema.PropertySchema{ "plugin": schema.NewPropertySchema( @@ -138,7 +141,7 @@ func (p *pluginProvider) Lifecycle() step.Lifecycle[step.LifecycleStage] { } } -func (p *pluginProvider) LoadSchema(inputs map[string]any) (step.RunnableStep, error) { +func (p *pluginProvider) LoadSchema(inputs map[string]any, _ map[string][]byte) (step.RunnableStep, error) { image := inputs["plugin"].(string) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/step/provider.go b/internal/step/provider.go index 83bbf59a..1f1548ad 100644 --- a/internal/step/provider.go +++ b/internal/step/provider.go @@ -5,6 +5,11 @@ import "go.flow.arcalot.io/pluginsdk/schema" // Provider is the description of an item that fits in a workflow. Its implementation provide the // basis for workflow execution. type Provider interface { + + // Register notifies the provider of the step registry it belongs to. This function is called directly after + // creation. + Register(registry Registry) + // Kind returns the identifier that uniquely identifies this provider. // e.g. "plugin" Kind() string @@ -22,9 +27,7 @@ type Provider interface { // LoadSchema prompts this provider to load its schema and return a step that can actually be run. The provided // inputs are guaranteed to match the schema returned by ProviderSchema. - LoadSchema( - inputs map[string]any, - ) (RunnableStep, error) + LoadSchema(inputs map[string]any, workflowContext map[string][]byte) (RunnableStep, error) } // StageChangeHandler is a callback hook for reacting to changes in stages. The calling party will be notified of the diff --git a/internal/step/registry.go b/internal/step/registry.go index 7452bc9b..28e6f48f 100644 --- a/internal/step/registry.go +++ b/internal/step/registry.go @@ -2,7 +2,8 @@ package step import "go.flow.arcalot.io/pluginsdk/schema" -// Registry holds the providers for possible steps in workflows. +// Registry holds the providers for possible steps in workflows. The registry must call Register() on each Provider +// immediately after creation. type Registry interface { // Schema provides a generic schema for all steps. Schema() *schema.OneOfSchema[string] diff --git a/internal/step/registry/registry.go b/internal/step/registry/registry.go index 9c47dc1c..aacd8bee 100644 --- a/internal/step/registry/registry.go +++ b/internal/step/registry/registry.go @@ -91,10 +91,14 @@ func New(providers ...step.Provider) (step.Registry, error) { objects[kind] = schema.NewObjectSchema(provider.Kind(), object) } - return &stepRegistry{ + registry := &stepRegistry{ p, objects, - }, nil + } + for _, provider := range registry.providers { + provider.Register(registry) + } + return registry, nil } type stepRegistry struct { diff --git a/new.go b/new.go index 4070ba61..a4b0bebc 100644 --- a/new.go +++ b/new.go @@ -14,7 +14,7 @@ func New( stepRegistry, err := NewDefaultStepRegistry( logger, DefaultDeployerRegistry, - config.LocalDeployer, + config, ) if err != nil { return nil, err diff --git a/steps.go b/steps.go index 67b2e5a6..4b3e7c07 100644 --- a/steps.go +++ b/steps.go @@ -5,7 +5,9 @@ import ( "go.arcalot.io/log/v2" deployerRegistry "go.flow.arcalot.io/deployer/registry" + "go.flow.arcalot.io/engine/config" "go.flow.arcalot.io/engine/internal/step" + "go.flow.arcalot.io/engine/internal/step/foreach" "go.flow.arcalot.io/engine/internal/step/plugin" stepRegistry "go.flow.arcalot.io/engine/internal/step/registry" ) @@ -14,15 +16,21 @@ import ( func NewDefaultStepRegistry( logger log.Logger, deployerRegistry deployerRegistry.Registry, - localDeployerConfig any, + config *config.Config, ) (step.Registry, error) { - pluginProvider, err := plugin.New(logger, deployerRegistry, localDeployerConfig) + pluginProvider, err := plugin.New(logger, deployerRegistry, config.LocalDeployer) if err != nil { return nil, fmt.Errorf("failed to create plugin step provider (%w)", err) } + loopProvider, err := foreach.New(logger, config) + if err != nil { + return nil, fmt.Errorf("failed to create loop step provider (%w)", err) + } + stepR, err := stepRegistry.New( pluginProvider, + loopProvider, ) if err != nil { return nil, fmt.Errorf("failed to create step registry (%w)", err) diff --git a/workflow/executor.go b/workflow/executor.go index 05f7630b..4ebc3beb 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -108,7 +108,7 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte } // Then we process the steps. This involves several sub-steps, make sure to check the function. - runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, err := e.processSteps(workflow, dag) + runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, err := e.processSteps(workflow, dag, workflowContext) if err != nil { return nil, err } @@ -202,6 +202,7 @@ func (e *executor) processInput(workflow *Workflow) (schema.Scope, error) { func (e *executor) processSteps( workflow *Workflow, dag dgraph.DirectedGraph[*DAGItem], + workflowContext map[string][]byte, ) ( runnableSteps map[string]step.RunnableStep, stepOutputProperties map[string]*schema.PropertySchema, @@ -233,7 +234,7 @@ func (e *executor) processSteps( } // Stage 1: unserialize the data with only the provider properties known. - runnableStep, err := e.loadSchema(stepKind, stepID, stepDataMap) + runnableStep, err := e.loadSchema(stepKind, stepID, stepDataMap, workflowContext) if err != nil { return nil, nil, nil, nil, err } @@ -491,7 +492,7 @@ func (e *executor) getRunData(stepKind step.Provider, runnableStep step.Runnable return result, nil } -func (e *executor) loadSchema(stepKind step.Provider, stepID string, stepDataMap map[any]any) (step.RunnableStep, error) { +func (e *executor) loadSchema(stepKind step.Provider, stepID string, stepDataMap map[any]any, workflowContext map[string][]byte) (step.RunnableStep, error) { properties := stepKind.ProviderSchema() properties["kind"] = schema.NewPropertySchema( schema.NewStringEnumSchema( @@ -547,7 +548,7 @@ func (e *executor) loadSchema(stepKind step.Provider, stepID string, stepDataMap for field := range providerSchema { providerData[field] = unserializedStepData.(map[string]any)[field] } - runnableStep, err := stepKind.LoadSchema(providerData) + runnableStep, err := stepKind.LoadSchema(providerData, workflowContext) if err != nil { return nil, &ErrInvalidWorkflow{fmt.Errorf("failed to load schema for step %s (%w)", stepID, err)} }