diff --git a/.changeset/happy-plum-gnomes.md b/.changeset/happy-plum-gnomes.md new file mode 100644 index 00000000000..d58726cdc72 --- /dev/null +++ b/.changeset/happy-plum-gnomes.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#fix Workflow Registry Syncer v2 cleans up orphaned pending events diff --git a/core/services/workflows/syncer/v2/workflow_registry.go b/core/services/workflows/syncer/v2/workflow_registry.go index 86db09ac9da..7466fd7d2d4 100644 --- a/core/services/workflows/syncer/v2/workflow_registry.go +++ b/core/services/workflows/syncer/v2/workflow_registry.go @@ -319,6 +319,23 @@ func (w *workflowRegistry) generateReconciliationEvents(_ context.Context, pendi } } + // Clean up create events which no longer need to be attempted because + // the workflow no longer exists in the workflow registry contract + for id, event := range pendingEvents { + if event.Name == WorkflowRegistered { + existsInMetadata := false + for _, wfMeta := range workflowMetadata { + if wfMeta.WorkflowID.Hex() == event.Data.(WorkflowRegisteredEvent).WorkflowID.Hex() { + existsInMetadata = true + break + } + } + if !existsInMetadata { + delete(pendingEvents, id) + } + } + } + if len(pendingEvents) != 0 { return nil, fmt.Errorf("invariant violation: some pending events were not handled in the reconcile loop: keys=%+v, len=%d", maps.Keys(pendingEvents), len(pendingEvents)) } diff --git a/core/services/workflows/syncer/v2/workflow_registry_test.go b/core/services/workflows/syncer/v2/workflow_registry_test.go index cfa1a3b10d8..2ddc18346c1 100644 --- a/core/services/workflows/syncer/v2/workflow_registry_test.go +++ b/core/services/workflows/syncer/v2/workflow_registry_test.go @@ -2,6 +2,7 @@ package v2 import ( "context" + "encoding/hex" "fmt" "testing" "time" @@ -642,4 +643,123 @@ func Test_generateReconciliationEventsV2(t *testing.T) { require.Equal(t, events[0].Name, WorkflowDeleted) require.Equal(t, events[1].Name, WorkflowRegistered) }) + + t.Run("pending delete events are handled when workflow metadata no longer exists", func(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + workflowDonNotifier := capabilities.NewDonNotifier() + // Engine already in the workflow registry + er := NewEngineRegistry() + wfID := [32]byte{1} + err := er.Add(wfID, &mockService{}) + require.NoError(t, err) + wr, err := NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (types.ContractReader, error) { + return nil, nil + }, + "", + Config{ + QueryCount: 20, + SyncStrategy: SyncStrategyReconciliation, + }, + &eventHandler{}, + workflowDonNotifier, + er, + ) + fakeClock := clockwork.NewFakeClock() + wr.clock = fakeClock + require.NoError(t, err) + + // A workflow is to be removed, but hits a failure, causing it to stay pending + event := WorkflowDeletedEvent{ + WorkflowID: wfID, + } + pendingEvents := map[string]*reconciliationEvent{ + hex.EncodeToString(wfID[:]): { + Event: Event{ + Data: event, + Name: WorkflowDeleted, + }, + id: hex.EncodeToString(wfID[:]), + signature: fmt.Sprintf("%s-%s-%s", WorkflowDeleted, hex.EncodeToString(wfID[:]), toSpecStatus(WorkflowStatusActive)), + nextRetryAt: time.Now(), + retryCount: 5, + }, + } + + // No workflows in metadata + metadata := []WorkflowMetadataView{} + + events, err := wr.generateReconciliationEvents(ctx, pendingEvents, metadata) + require.NoError(t, err) + require.Len(t, events, 1) + require.Equal(t, WorkflowDeleted, events[0].Name) + require.Empty(t, pendingEvents) + }) + + t.Run("pending create events are handled when workflow metadata no longer exists", func(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + workflowDonNotifier := capabilities.NewDonNotifier() + er := NewEngineRegistry() + wr, err := NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (types.ContractReader, error) { + return nil, nil + }, + "", + Config{ + QueryCount: 20, + SyncStrategy: SyncStrategyReconciliation, + }, + &eventHandler{}, + workflowDonNotifier, + er, + ) + fakeClock := clockwork.NewFakeClock() + wr.clock = fakeClock + require.NoError(t, err) + + // A workflow is added, but hits a failure during creation, causing it to stay pending + binaryURL := "b1" + configURL := "c1" + wfID := [32]byte{1} + owner := []byte{} + wfName := "wf name 1" + createdAt := uint64(1000000) + tag := "tag1" + attributes := []byte{} + event := WorkflowRegisteredEvent{ + WorkflowID: wfID, + WorkflowOwner: owner, + CreatedAt: createdAt, + Status: WorkflowStatusActive, + WorkflowName: wfName, + BinaryURL: binaryURL, + ConfigURL: configURL, + Tag: tag, + Attributes: attributes, + } + pendingEvents := map[string]*reconciliationEvent{ + hex.EncodeToString(wfID[:]): { + Event: Event{ + Data: event, + Name: WorkflowRegistered, + }, + id: hex.EncodeToString(wfID[:]), + signature: fmt.Sprintf("%s-%s-%s", WorkflowRegistered, hex.EncodeToString(wfID[:]), toSpecStatus(WorkflowStatusActive)), + nextRetryAt: time.Now(), + retryCount: 5, + }, + } + + // The workflow then gets removed + metadata := []WorkflowMetadataView{} + + events, err := wr.generateReconciliationEvents(ctx, pendingEvents, metadata) + require.NoError(t, err) + require.Empty(t, events) + require.Empty(t, pendingEvents) + }) }