Skip to content

Commit

Permalink
Pausing, resuming, and seeking messages should be sent to each of the
Browse files Browse the repository at this point in the history
change stream go routines if there are multiple
  • Loading branch information
rwynn committed Jul 15, 2020
1 parent 5e8f85c commit cb7741a
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions gtm.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,19 @@ func (ctx *OpCtx) isStopped() bool {
func (ctx *OpCtx) Since(ts primitive.Timestamp) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.seekC <- ts
for i := 0; i < cap(ctx.seekC); i++ {
ctx.seekC <- ts
}
}

func (ctx *OpCtx) Pause() {
ctx.lock.Lock()
defer ctx.lock.Unlock()
if !ctx.paused {
ctx.paused = true
ctx.pauseC <- true
for i := 0; i < cap(ctx.pauseC); i++ {
ctx.pauseC <- true
}
}
}

Expand All @@ -463,7 +467,9 @@ func (ctx *OpCtx) Resume() {
defer ctx.lock.Unlock()
if ctx.paused {
ctx.paused = false
ctx.resumeC <- true
for i := 0; i < cap(ctx.resumeC); i++ {
ctx.resumeC <- true
}
}
}

Expand Down

0 comments on commit cb7741a

Please sign in to comment.