Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rare-pianos-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal #bugfix Wait for the DON on a workflow node before starting a contract reader
53 changes: 38 additions & 15 deletions core/services/workflows/syncer/v2/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type workflowRegistry struct {
allowListedMu sync.RWMutex

contractReaderFn versioning.ContractReaderFactory
contractReader types.ContractReader
readerMu sync.RWMutex

config Config

Expand Down Expand Up @@ -168,34 +170,51 @@ func (w *workflowRegistry) Start(_ context.Context) error {
return w.StartOnce(w.Name(), func() error {
ctx, cancel := w.stopCh.NewCtx()

contractReader, err := w.newWorkflowRegistryContractReader(context.Background())
if err != nil {
w.lggr.Criticalf("contract reader unavailable : %s", err)
return errors.New("failed to create contract reader: " + err.Error())
}
initDoneCh := make(chan struct{})

w.wg.Add(1)
go func() {
defer w.wg.Done()
defer cancel()
defer w.lggr.Debug("Received DON and set contract reader")
defer close(initDoneCh)

w.lggr.Debugw("Waiting for DON...")
don, err := w.workflowDonNotifier.WaitForDon(ctx)
if err != nil {
if _, err := w.workflowDonNotifier.WaitForDon(ctx); err != nil {
w.lggr.Errorw("failed to wait for don", "err", err)
return
}

// Async initialization of contract reader because there is an on-chain
// call dependency. Blocking on initialization results in a
// deadlock. Instead wait until the node has identified it's DON
// as a proxy for a DON and on-chain ready state .
reader, err := w.newWorkflowRegistryContractReader(ctx)
if err != nil {
w.lggr.Criticalf("contract reader unavailable : %s", err)
return
}

w.readerMu.Lock()
defer w.readerMu.Unlock()
w.contractReader = reader
}()

w.wg.Add(1)
go func() {
defer w.wg.Done()
defer cancel()
// Start goroutines to gather changes from Workflow Registry contract
w.syncUsingReconciliationStrategy(ctx, don, contractReader)
<-initDoneCh
don, _ := w.workflowDonNotifier.WaitForDon(ctx)
w.syncUsingReconciliationStrategy(ctx, don)
}()

w.wg.Add(1)
go func() {
defer w.wg.Done()
defer cancel()
// Start goroutines to gather allowlisted requests from Workflow Registry contract
w.syncAllowlistedRequests(ctx, contractReader)
<-initDoneCh
w.syncAllowlistedRequests(ctx)
}()

return nil
Expand Down Expand Up @@ -362,7 +381,7 @@ func (w *workflowRegistry) generateReconciliationEvents(_ context.Context, pendi
return events, nil
}

func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context, contractReader types.ContractReader) {
func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context) {
ticker := time.NewTicker(defaultTickIntervalForAllowlistedRequests).C
w.lggr.Debug("starting syncAllowlistedRequests")
for {
Expand All @@ -371,7 +390,9 @@ func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context, contract
w.lggr.Debug("shutting down syncAllowlistedRequests, %s", ctx.Err())
return
case <-ticker:
allowListedRequests, head, err := w.getAllowlistedRequests(ctx, contractReader)
w.readerMu.RLock()
allowListedRequests, head, err := w.getAllowlistedRequests(ctx, w.contractReader)
w.readerMu.RUnlock()
if err != nil {
w.lggr.Errorw("failed to call getAllowlistedRequests", "err", err)
continue
Expand All @@ -386,7 +407,7 @@ func (w *workflowRegistry) syncAllowlistedRequests(ctx context.Context, contract

// syncUsingReconciliationStrategy syncs workflow registry contract state by polling the workflow metadata state and comparing to local state.
// NOTE: In this mode paused states will be treated as a deleted workflow. Workflows will not be registered as paused.
func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context, don capabilities.DON, contractReader types.ContractReader) {
func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context, don capabilities.DON) {
ticker := w.getTicker()
pendingEvents := map[string]*reconciliationEvent{}
w.lggr.Debug("running readRegistryStateLoop")
Expand All @@ -396,7 +417,9 @@ func (w *workflowRegistry) syncUsingReconciliationStrategy(ctx context.Context,
w.lggr.Debug("shutting down readRegistryStateLoop")
return
case <-ticker:
workflowMetadata, head, err := w.getWorkflowMetadata(ctx, don, contractReader)
w.readerMu.RLock()
workflowMetadata, head, err := w.getWorkflowMetadata(ctx, don, w.contractReader)
w.readerMu.RUnlock()
if err != nil {
w.lggr.Errorw("failed to get registry state", "err", err)
continue
Expand Down
Loading