Skip to content

Commit

Permalink
Fix: ensure timely shutdown of rebroadcast heads. (#156)
Browse files Browse the repository at this point in the history
I have observed this routine sometimes just stayed running, potentially
preventing the shutdown of the whole datastore.
  • Loading branch information
hsanjuan committed Mar 22, 2022
2 parents 641e9fd + fdb6d41 commit 668b0c2
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ type Datastore struct {
seenHeadsMux sync.RWMutex
seenHeads map[cid.Cid]struct{}

rebroadcastTicker *time.Ticker

curDeltaMux sync.Mutex
curDelta *pb.Delta // current, unpublished delta

Expand Down Expand Up @@ -270,21 +268,20 @@ func New(
}

dstore := &Datastore{
ctx: ctx,
cancel: cancel,
opts: opts,
logger: opts.Logger,
store: store,
namespace: namespace,
set: set,
heads: heads,
dagService: dagSyncer,
broadcaster: bcast,
seenHeads: make(map[cid.Cid]struct{}),
rebroadcastTicker: time.NewTicker(opts.RebroadcastInterval),
jobQueue: make(chan *dagJob, opts.NumWorkers),
sendJobs: make(chan *dagJob),
queuedChildren: newCidSafeSet(),
ctx: ctx,
cancel: cancel,
opts: opts,
logger: opts.Logger,
store: store,
namespace: namespace,
set: set,
heads: heads,
dagService: dagSyncer,
broadcaster: bcast,
seenHeads: make(map[cid.Cid]struct{}),
jobQueue: make(chan *dagJob, opts.NumWorkers),
sendJobs: make(chan *dagJob),
queuedChildren: newCidSafeSet(),
}

headList, maxHeight, err := dstore.heads.List()
Expand Down Expand Up @@ -455,14 +452,27 @@ func (store *Datastore) encodeBroadcast(heads []cid.Cid) ([]byte, error) {
}

func (store *Datastore) rebroadcast() {
ticker := store.rebroadcastTicker
defer ticker.Stop()
timer := time.NewTimer(store.opts.RebroadcastInterval)

for {
select {
case <-store.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-ticker.C:
default:
}

select {
case <-store.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
store.rebroadcastHeads()
timer.Reset(store.opts.RebroadcastInterval)
}
}
}
Expand Down

0 comments on commit 668b0c2

Please sign in to comment.