From fdb6d41aac19da7be02b16d10b428e6e93025c31 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 22 Mar 2022 01:32:25 +0100 Subject: [PATCH] Fix: ensure timely shutdown of rebroadcast heads. I have observed this routine sometimes just stayed running, potentially preventing the shutdown of the whole datastore. --- crdt.go | 50 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/crdt.go b/crdt.go index 6ccb7bc..311026c 100644 --- a/crdt.go +++ b/crdt.go @@ -179,8 +179,6 @@ type Datastore struct { seenHeadsMux sync.RWMutex seenHeads map[cid.Cid]struct{} - rebroadcastTicker *time.Ticker - curDeltaMux sync.Mutex curDelta *pb.Delta // current, unpublished delta @@ -270,21 +268,20 @@ func New( } dstore := &Datastore{ - ctx: ctx, - cancel: cancel, - opts: opts, - logger: opts.Logger, - store: store, - namespace: namespace, - set: set, - heads: heads, - dagService: dagSyncer, - broadcaster: bcast, - seenHeads: make(map[cid.Cid]struct{}), - rebroadcastTicker: time.NewTicker(opts.RebroadcastInterval), - jobQueue: make(chan *dagJob, opts.NumWorkers), - sendJobs: make(chan *dagJob), - queuedChildren: newCidSafeSet(), + ctx: ctx, + cancel: cancel, + opts: opts, + logger: opts.Logger, + store: store, + namespace: namespace, + set: set, + heads: heads, + dagService: dagSyncer, + broadcaster: bcast, + seenHeads: make(map[cid.Cid]struct{}), + jobQueue: make(chan *dagJob, opts.NumWorkers), + sendJobs: make(chan *dagJob), + queuedChildren: newCidSafeSet(), } headList, maxHeight, err := dstore.heads.List() @@ -455,14 +452,27 @@ func (store *Datastore) encodeBroadcast(heads []cid.Cid) ([]byte, error) { } func (store *Datastore) rebroadcast() { - ticker := store.rebroadcastTicker - defer ticker.Stop() + timer := time.NewTimer(store.opts.RebroadcastInterval) + for { select { case <-store.ctx.Done(): + if !timer.Stop() { + <-timer.C + } return - case <-ticker.C: + default: + } + + select { + case <-store.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: store.rebroadcastHeads() + timer.Reset(store.opts.RebroadcastInterval) } } }