diff --git a/.changeset/rare-pianos-end.md b/.changeset/rare-pianos-end.md new file mode 100644 index 00000000000..b9fa4ad228b --- /dev/null +++ b/.changeset/rare-pianos-end.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal #bugfix Wait for the DON on a workflow node before starting a contract reader diff --git a/core/services/workflows/syncer/v2/workflow_registry.go b/core/services/workflows/syncer/v2/workflow_registry.go index c1fd92475fd..1cfa7eede25 100644 --- a/core/services/workflows/syncer/v2/workflow_registry.go +++ b/core/services/workflows/syncer/v2/workflow_registry.go @@ -74,6 +74,8 @@ type workflowRegistry struct { allowListedMu sync.RWMutex contractReaderFn versioning.ContractReaderFactory + contractReader types.ContractReader + readerMu sync.RWMutex config Config @@ -168,26 +170,42 @@ func (w *workflowRegistry) Start(_ context.Context) error { return w.StartOnce(w.Name(), func() error { ctx, cancel := w.stopCh.NewCtx() - contractReader, err := w.newWorkflowRegistryContractReader(context.Background()) - if err != nil { - w.lggr.Criticalf("contract reader unavailable : %s", err) - return errors.New("failed to create contract reader: " + err.Error()) - } + initDoneCh := make(chan struct{}) w.wg.Add(1) go func() { - defer w.wg.Done() - defer cancel() + defer w.lggr.Debug("Received DON and set contract reader") + defer close(initDoneCh) w.lggr.Debugw("Waiting for DON...") - don, err := w.workflowDonNotifier.WaitForDon(ctx) - if err != nil { + if _, err := w.workflowDonNotifier.WaitForDon(ctx); err != nil { w.lggr.Errorw("failed to wait for don", "err", err) return } + // Async initialization of contract reader because there is an on-chain + // call dependency. Blocking on initialization results in a + // deadlock. Instead wait until the node has identified it's DON + // as a proxy for a DON and on-chain ready state . + reader, err := w.newWorkflowRegistryContractReader(ctx) + if err != nil { + w.lggr.Criticalf("contract reader unavailable : %s", err) + return + } + + w.readerMu.Lock() + defer w.readerMu.Unlock() + w.contractReader = reader + }() + + w.wg.Add(1) + go func() { + defer w.wg.Done() + defer cancel() // Start goroutines to gather changes from Workflow Registry contract - w.syncUsingReconciliationStrategy(ctx, don, contractReader) + <-initDoneCh + don, _ := w.workflowDonNotifier.WaitForDon(ctx) + w.syncUsingReconciliationStrategy(ctx, don) }() w.wg.Add(1) @@ -195,7 +213,8 @@ func (w *workflowRegistry) Start(_ context.Context) error { defer w.wg.Done() defer cancel() // Start goroutines to gather allowlisted requests from Workflow Registry contract - w.syncAllowlistedRequests(ctx, contractReader) + <-initDoneCh + w.syncAllowlistedRequests(ctx) }() return nil @@ -362,7 +381,7 @@ func (w *workflowRegistry) generateReconciliationEvents(_ context.Context, pendi return events, nil } -func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context, contractReader types.ContractReader) { +func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context) { ticker := time.NewTicker(defaultTickIntervalForAllowlistedRequests).C w.lggr.Debug("starting syncAllowlistedRequests") for { @@ -371,7 +390,9 @@ func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context, contract w.lggr.Debug("shutting down syncAllowlistedRequests, %s", ctx.Err()) return case <-ticker: - allowListedRequests, head, err := w.getAllowlistedRequests(ctx, contractReader) + w.readerMu.RLock() + allowListedRequests, head, err := w.getAllowlistedRequests(ctx, w.contractReader) + w.readerMu.RUnlock() if err != nil { w.lggr.Errorw("failed to call getAllowlistedRequests", "err", err) continue @@ -386,7 +407,7 @@ func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context, contract // syncUsingReconciliationStrategy syncs workflow registry contract state by polling the workflow metadata state and comparing to local state. // NOTE: In this mode paused states will be treated as a deleted workflow. Workflows will not be registered as paused. -func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context, don capabilities.DON, contractReader types.ContractReader) { +func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context, don capabilities.DON) { ticker := w.getTicker() pendingEvents := map[string]*reconciliationEvent{} w.lggr.Debug("running readRegistryStateLoop") @@ -396,7 +417,9 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context, w.lggr.Debug("shutting down readRegistryStateLoop") return case <-ticker: - workflowMetadata, head, err := w.getWorkflowMetadata(ctx, don, contractReader) + w.readerMu.RLock() + workflowMetadata, head, err := w.getWorkflowMetadata(ctx, don, w.contractReader) + w.readerMu.RUnlock() if err != nil { w.lggr.Errorw("failed to get registry state", "err", err) continue