diff --git a/crdt.go b/crdt.go index 6ccb7bc1..311026ca 100644 --- a/crdt.go +++ b/crdt.go @@ -179,8 +179,6 @@ type Datastore struct { seenHeadsMux sync.RWMutex seenHeads map[cid.Cid]struct{} - rebroadcastTicker *time.Ticker - curDeltaMux sync.Mutex curDelta *pb.Delta // current, unpublished delta @@ -270,21 +268,20 @@ func New( } dstore := &Datastore{ - ctx: ctx, - cancel: cancel, - opts: opts, - logger: opts.Logger, - store: store, - namespace: namespace, - set: set, - heads: heads, - dagService: dagSyncer, - broadcaster: bcast, - seenHeads: make(map[cid.Cid]struct{}), - rebroadcastTicker: time.NewTicker(opts.RebroadcastInterval), - jobQueue: make(chan *dagJob, opts.NumWorkers), - sendJobs: make(chan *dagJob), - queuedChildren: newCidSafeSet(), + ctx: ctx, + cancel: cancel, + opts: opts, + logger: opts.Logger, + store: store, + namespace: namespace, + set: set, + heads: heads, + dagService: dagSyncer, + broadcaster: bcast, + seenHeads: make(map[cid.Cid]struct{}), + jobQueue: make(chan *dagJob, opts.NumWorkers), + sendJobs: make(chan *dagJob), + queuedChildren: newCidSafeSet(), } headList, maxHeight, err := dstore.heads.List() @@ -455,14 +452,27 @@ func (store *Datastore) encodeBroadcast(heads []cid.Cid) ([]byte, error) { } func (store *Datastore) rebroadcast() { - ticker := store.rebroadcastTicker - defer ticker.Stop() + timer := time.NewTimer(store.opts.RebroadcastInterval) + for { select { case <-store.ctx.Done(): + if !timer.Stop() { + <-timer.C + } return - case <-ticker.C: + default: + } + + select { + case <-store.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: store.rebroadcastHeads() + timer.Reset(store.opts.RebroadcastInterval) } } }