Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/happy-plum-gnomes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#fix Workflow Registry Syncer v2 cleans up orphaned pending events
17 changes: 17 additions & 0 deletions core/services/workflows/syncer/v2/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,23 @@ func (w *workflowRegistry) generateReconciliationEvents(_ context.Context, pendi
}
}

// Clean up create events which no longer need to be attempted because
// the workflow no longer exists in the workflow registry contract
for id, event := range pendingEvents {
if event.Name == WorkflowRegistered {
existsInMetadata := false
for _, wfMeta := range workflowMetadata {
if wfMeta.WorkflowID.Hex() == event.Data.(WorkflowRegisteredEvent).WorkflowID.Hex() {
existsInMetadata = true
break
}
}
if !existsInMetadata {
delete(pendingEvents, id)
}
}
}

if len(pendingEvents) != 0 {
return nil, fmt.Errorf("invariant violation: some pending events were not handled in the reconcile loop: keys=%+v, len=%d", maps.Keys(pendingEvents), len(pendingEvents))
}
Expand Down
120 changes: 120 additions & 0 deletions core/services/workflows/syncer/v2/workflow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"context"
"encoding/hex"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -642,4 +643,123 @@ func Test_generateReconciliationEventsV2(t *testing.T) {
require.Equal(t, events[0].Name, WorkflowDeleted)
require.Equal(t, events[1].Name, WorkflowRegistered)
})

t.Run("pending delete events are handled when workflow metadata no longer exists", func(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
workflowDonNotifier := capabilities.NewDonNotifier()
// Engine already in the workflow registry
er := NewEngineRegistry()
wfID := [32]byte{1}
err := er.Add(wfID, &mockService{})
require.NoError(t, err)
wr, err := NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (types.ContractReader, error) {
return nil, nil
},
"",
Config{
QueryCount: 20,
SyncStrategy: SyncStrategyReconciliation,
},
&eventHandler{},
workflowDonNotifier,
er,
)
fakeClock := clockwork.NewFakeClock()
wr.clock = fakeClock
require.NoError(t, err)

// A workflow is to be removed, but hits a failure, causing it to stay pending
event := WorkflowDeletedEvent{
WorkflowID: wfID,
}
pendingEvents := map[string]*reconciliationEvent{
hex.EncodeToString(wfID[:]): {
Event: Event{
Data: event,
Name: WorkflowDeleted,
},
id: hex.EncodeToString(wfID[:]),
signature: fmt.Sprintf("%s-%s-%s", WorkflowDeleted, hex.EncodeToString(wfID[:]), toSpecStatus(WorkflowStatusActive)),
nextRetryAt: time.Now(),
retryCount: 5,
},
}

// No workflows in metadata
metadata := []WorkflowMetadataView{}

events, err := wr.generateReconciliationEvents(ctx, pendingEvents, metadata)
require.NoError(t, err)
require.Len(t, events, 1)
require.Equal(t, WorkflowDeleted, events[0].Name)
require.Empty(t, pendingEvents)
})

t.Run("pending create events are handled when workflow metadata no longer exists", func(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
workflowDonNotifier := capabilities.NewDonNotifier()
er := NewEngineRegistry()
wr, err := NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (types.ContractReader, error) {
return nil, nil
},
"",
Config{
QueryCount: 20,
SyncStrategy: SyncStrategyReconciliation,
},
&eventHandler{},
workflowDonNotifier,
er,
)
fakeClock := clockwork.NewFakeClock()
wr.clock = fakeClock
require.NoError(t, err)

// A workflow is added, but hits a failure during creation, causing it to stay pending
binaryURL := "b1"
configURL := "c1"
wfID := [32]byte{1}
owner := []byte{}
wfName := "wf name 1"
createdAt := uint64(1000000)
tag := "tag1"
attributes := []byte{}
event := WorkflowRegisteredEvent{
WorkflowID: wfID,
WorkflowOwner: owner,
CreatedAt: createdAt,
Status: WorkflowStatusActive,
WorkflowName: wfName,
BinaryURL: binaryURL,
ConfigURL: configURL,
Tag: tag,
Attributes: attributes,
}
pendingEvents := map[string]*reconciliationEvent{
hex.EncodeToString(wfID[:]): {
Event: Event{
Data: event,
Name: WorkflowRegistered,
},
id: hex.EncodeToString(wfID[:]),
signature: fmt.Sprintf("%s-%s-%s", WorkflowRegistered, hex.EncodeToString(wfID[:]), toSpecStatus(WorkflowStatusActive)),
nextRetryAt: time.Now(),
retryCount: 5,
},
}

// The workflow then gets removed
metadata := []WorkflowMetadataView{}

events, err := wr.generateReconciliationEvents(ctx, pendingEvents, metadata)
require.NoError(t, err)
require.Empty(t, events)
require.Empty(t, pendingEvents)
})
}
Loading