Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dependency system: Part 1 #193

Merged
merged 8 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
go.arcalot.io/assert v1.8.0
go.arcalot.io/dgraph v1.3.0
go.arcalot.io/dgraph v1.4.1
go.arcalot.io/lang v1.1.0
go.arcalot.io/log/v2 v2.1.0
go.flow.arcalot.io/deployer v0.6.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.arcalot.io/assert v1.8.0 h1:hGcHMPncQXwQvjj7MbyOu2gg8VIBB00crUJZpeQOjxs=
go.arcalot.io/assert v1.8.0/go.mod h1:nNmWPoNUHFyrPkNrD2aASm5yPuAfiWdB/4X7Lw3ykHk=
go.arcalot.io/dgraph v1.3.0 h1:CDj6bhskomgC02roL0TxOgSb8SbkFgQ7hbTH+vJTCDw=
go.arcalot.io/dgraph v1.3.0/go.mod h1:+Kxc81utiihMSmC1/ttSPGLDlWPpvgOpNxSFmIDPxFM=
go.arcalot.io/dgraph v1.4.1 h1:y/lhJ68WzNUDR2BYSk7tZAZhVokts92svcrJLbK4Ebo=
go.arcalot.io/dgraph v1.4.1/go.mod h1:+Kxc81utiihMSmC1/ttSPGLDlWPpvgOpNxSFmIDPxFM=
go.arcalot.io/exex v0.2.0 h1:u44pjwPwcH57TF8knhaqVZP/1V/KbnRe//pKzMwDpLw=
go.arcalot.io/exex v0.2.0/go.mod h1:5zlFr+7vOQNZKYCNOEDdsad+z/dlvXKs2v4kG+v+bQo=
go.arcalot.io/lang v1.1.0 h1:ugglRKpd3qIMkdghAjKJxsziIgHm8QpxrzZPSXoa08I=
Expand Down
159 changes: 59 additions & 100 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ type executableWorkflow struct {
stepRunData map[string]map[string]any
workflowContext map[string][]byte
internalDataModel *schema.ScopeSchema
runnableSteps map[string]step.RunnableStep
lifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema]
outputSchema map[string]*schema.StepOutputSchema
// All of these fields have the step ID as the key.
runnableSteps map[string]step.RunnableStep
lifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema]
outputSchema map[string]*schema.StepOutputSchema
}

func (e *executableWorkflow) OutputSchema() map[string]*schema.StepOutputSchema {
Expand Down Expand Up @@ -82,7 +83,6 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) (
},
callableFunctions: e.callableFunctions,
dag: e.dag.Clone(),
inputsNotified: make(map[string]struct{}, len(e.dag.ListNodes())),
runningSteps: make(map[string]step.RunningStep, len(e.dag.ListNodes())),
outputDataChannel: make(chan outputDataType, 1),
outputDone: false,
Expand Down Expand Up @@ -168,8 +168,11 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) (
if err != nil {
return "", nil, fmt.Errorf("bug: cannot obtain input node (%w)", err)
}
if err := inputNode.Remove(); err != nil {
return "", nil, fmt.Errorf("failed to remove input node from DAG (%w)", err)
if err := l.dag.PushStartingNodes(); err != nil {
return "", nil, fmt.Errorf("failed to setup starting nodes in DAG (%w)", err)
}
if err := inputNode.ResolveNode(dgraph.Resolved); err != nil {
return "", nil, fmt.Errorf("failed to resolve input node in DAG (%w)", err)
}
webbnh marked this conversation as resolved.
Show resolved Hide resolved

func() {
Expand Down Expand Up @@ -229,7 +232,6 @@ type loopState struct {
data map[string]any
dag dgraph.DirectedGraph[*DAGItem]
callableFunctions map[string]schema.CallableFunction
inputsNotified map[string]struct{}
runningSteps map[string]step.RunningStep
outputDataChannel chan outputDataType
outputDone bool
Expand Down Expand Up @@ -280,10 +282,10 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo
l.cancel()
return
}
l.logger.Debugf("Removed node '%s' from the DAG", stageNode.ID())
if err := stageNode.Remove(); err != nil {
l.logger.Errorf("Failed to remove stage node ID %s (%w)", stageNode.ID(), err)
l.recentErrors <- fmt.Errorf("failed to remove stage node ID %s (%w)", stageNode.ID(), err)
l.logger.Debugf("Resolving node %q in the DAG", stageNode.ID())
if err := stageNode.ResolveNode(dgraph.Resolved); err != nil {
l.logger.Errorf("Failed to resolve stage node ID %s (%w)", stageNode.ID(), err)
l.recentErrors <- fmt.Errorf("failed to resolve stage node ID %s (%w)", stageNode.ID(), err)
l.cancel()
return
}
Expand All @@ -295,11 +297,12 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo
l.cancel()
return
}
// Removes the node from the DAG. This results in the nodes not having inbound connections, allowing them to be processed.
l.logger.Debugf("Removed node '%s' from the DAG", outputNode.ID())
if err := outputNode.Remove(); err != nil {
l.logger.Errorf("Failed to remove output node ID %s (%w)", outputNode.ID(), err)
l.recentErrors <- fmt.Errorf("failed to remove output node ID %s (%w)", outputNode.ID(), err)
// Resolves the node in the DAG. This allows us to know which nodes are
mfleader marked this conversation as resolved.
Show resolved Hide resolved
// ready for processing due to all dependencies being resolved.
l.logger.Debugf("Resolving node %q in the DAG", outputNode.ID())
if err := outputNode.ResolveNode(dgraph.Resolved); err != nil {
l.logger.Errorf("Failed to resolve output node ID %s (%w)", outputNode.ID(), err)
l.recentErrors <- fmt.Errorf("failed to resolve output node ID %s (%w)", outputNode.ID(), err)
l.cancel()
return
}
Expand All @@ -322,32 +325,29 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo
l.notifySteps()
}

// notifySteps is a function we can call to go through all DAG nodes that have no inbound connections and
// provide step inputs based on expressions.
// notifySteps is a function we can call to go through all DAG nodes that are marked
// ready and provides step inputs based on expressions.
// The lock should be acquired by the caller before this is called.
func (l *loopState) notifySteps() { //nolint:gocognit
// This function goes through the DAG and feeds the input to all steps that have no further inbound
// dependencies.
//
// This function could be further optimized if there was a DAG that contained not only the steps, but the
// concrete values that needed to be updated. This would make it possible to completely forego the need to
// iterate through the input.

nodesWithoutInbound := l.dag.ListNodesWithoutInboundConnections()
l.logger.Debugf("Currently %d DAG nodes have no inbound connection. Now processing them.", len(nodesWithoutInbound))

// nodesWithoutInbound have all dependencies resolved. No inbound connection.
// Also includes nodes that are not for running, like an input.
for nodeID, node := range nodesWithoutInbound {
if _, ok := l.inputsNotified[nodeID]; ok {
readyNodes := l.dag.PopReadyNodes()
l.logger.Debugf("Currently %d DAG nodes are ready. Now processing them.", len(readyNodes))

// Can include runnable nodes, nodes that cannot be resolved, and nodes that are not for running, like an input.
for nodeID, resolutionStatus := range readyNodes {
if resolutionStatus == dgraph.Unresolvable {
l.logger.Debugf("Disregarding node %q with resolution %q", nodeID, resolutionStatus)
continue
}
l.logger.Debugf("Processing step node %s", nodeID)
l.inputsNotified[nodeID] = struct{}{}
node, err := l.dag.GetNodeByID(nodeID)
if err != nil {
panic(fmt.Errorf("failed to get node %s (%w)", nodeID, err))
}
nodeItem := node.Item()
// The data structure that the particular node requires. One or more fields. May or may not contain expressions.
inputData := node.Item().Data
inputData := nodeItem.Data
if inputData == nil {
// No input data is needed.
// No input data is needed. This is often the case for input nodes.
continue
}
// Resolve any expressions in the input data.
Expand All @@ -358,24 +358,24 @@ func (l *loopState) notifySteps() { //nolint:gocognit
}

// This switch checks to see if it's a node that needs to be run.
switch node.Item().Kind {
switch nodeItem.Kind {
case DAGItemKindStepStage:
if node.Item().DataSchema == nil {
if nodeItem.DataSchema == nil {
// This should only happen if the stage doesn't have any input fields.
// This may not even get called. That should be checked.
break
}
// We have a stage we can proceed with. Let's provide it with input.
// Tries to match the schema
if _, err := node.Item().DataSchema.Unserialize(untypedInputData); err != nil {
l.logger.Errorf("Bug: schema evaluation resulted in invalid data for %s (%v)", node.ID(), err)
l.recentErrors <- fmt.Errorf("bug: schema evaluation resulted in invalid data for %s (%w)", node.ID(), err)
if _, err := nodeItem.DataSchema.Unserialize(untypedInputData); err != nil {
l.logger.Errorf("Bug: schema evaluation resulted in invalid data for %s (%v)", nodeID, err)
l.recentErrors <- fmt.Errorf("bug: schema evaluation resulted in invalid data for %s (%w)", nodeID, err)
webbnh marked this conversation as resolved.
Show resolved Hide resolved
l.cancel()
return
}

// This check is here just to make sure it has the required fields set
if node.Item().StepID == "" || node.Item().StageID == "" {
if nodeItem.StepID == "" || nodeItem.StageID == "" {
// This shouldn't happen
panic("Step or stage ID missing")
}
Expand All @@ -387,12 +387,12 @@ func (l *loopState) notifySteps() { //nolint:gocognit
}
// Sends it to the plugin
l.logger.Debugf("Providing stage input for %s...", nodeID)
if err := l.runningSteps[node.Item().StepID].ProvideStageInput(
node.Item().StageID,
if err := l.runningSteps[nodeItem.StepID].ProvideStageInput(
nodeItem.StageID,
typedInputData,
); err != nil {
l.logger.Errorf("Bug: failed to provide input to step %s (%w)", node.Item().StepID, err)
l.recentErrors <- fmt.Errorf("bug: failed to provide input to step %s (%w)", node.Item().StepID, err)
l.logger.Errorf("Bug: failed to provide input to step %s (%w)", nodeItem.StepID, err)
l.recentErrors <- fmt.Errorf("bug: failed to provide input to step %s (%w)", nodeItem.StepID, err)
l.cancel()
return
}
Expand All @@ -408,79 +408,37 @@ func (l *loopState) notifySteps() { //nolint:gocognit
// other copies of this should be attempting to write to the output
// data channel. This is required to prevent the goroutine from stalling.
l.outputDataChannel <- outputDataType{
outputID: node.Item().OutputID,
outputID: nodeItem.OutputID,
outputData: untypedInputData,
}
// Since this is the only thread accessing the channel, it should be
// safe to close it now
close(l.outputDataChannel)
}

if err := node.Remove(); err != nil {
if err := node.ResolveNode(dgraph.Resolved); err != nil {
l.logger.Errorf("BUG: Error occurred while removing workflow output node (%w)", err)
}
}
}
webbnh marked this conversation as resolved.
Show resolved Hide resolved
}

type stateCounters struct {
starting int
waitingWithInbound int
waitingWithoutInbound int
running int
finished int
starting int
waiting int
running int
finished int
}

func (l *loopState) countStates() stateCounters {
counters := struct {
starting int
waitingWithInbound int
waitingWithoutInbound int
running int
finished int
}{
0,
0,
0,
0,
0,
}
func (l *loopState) countStates() (counters stateCounters) {
for stepID, runningStep := range l.runningSteps {
switch runningStep.State() {
case step.RunningStepStateStarting:
counters.starting++
l.logger.Debugf("Step %s is currently starting.", stepID)
case step.RunningStepStateWaitingForInput:
connectionsMsg := ""
dagNode, err := l.dag.GetNodeByID(GetStageNodeID(stepID, runningStep.CurrentStage()))
switch {
case err != nil:
l.logger.Warningf("Failed to get DAG node for the debug message (%w)", err)
counters.waitingWithInbound++
case dagNode == nil:
l.logger.Warningf("Failed to get DAG node for the debug message. Returned nil", err)
counters.waitingWithInbound++
default:
inboundConnections, err := dagNode.ListInboundConnections()
if err != nil {
l.logger.Warningf("Error while listing inbound connections. (%w)", err)
}
if len(inboundConnections) > 0 {
counters.waitingWithInbound++
} else {
counters.waitingWithoutInbound++
}

i := 0
for k := range inboundConnections {
if i > 0 {
connectionsMsg += ", "
}
connectionsMsg += k
i++
}
}
l.logger.Debugf("Step %s, stage %s, is currently waiting for input from '%s'.", stepID, runningStep.CurrentStage(), connectionsMsg)
counters.waiting++
l.logger.Debugf("Step %s is currently waiting.", stepID)
case step.RunningStepStateRunning:
counters.running++
l.logger.Debugf("Step %s is currently running.", stepID)
Expand All @@ -495,15 +453,16 @@ func (l *loopState) countStates() stateCounters {
func (l *loopState) checkForDeadlocks(retries int, wg *sync.WaitGroup) {
// Here we make sure we don't have a deadlock.
counters := l.countStates()
hasReadyNodes := l.dag.HasReadyNodes()
l.logger.Infof(
"There are currently %d steps starting, %d waiting for input, %d ready for input, %d running, %d finished",
"There are currently %d steps starting, %d waiting, %d running, %d finished. HasReadyNodes: %t",
counters.starting,
counters.waitingWithInbound,
counters.waitingWithoutInbound,
counters.waiting,
counters.running,
counters.finished,
hasReadyNodes,
)
if counters.starting == 0 && counters.running == 0 && counters.waitingWithoutInbound == 0 && !l.outputDone {
if counters.starting == 0 && counters.running == 0 && !hasReadyNodes && !l.outputDone {
if retries <= 0 {
l.recentErrors <- &ErrNoMorePossibleSteps{
l.dag,
Expand Down
Loading