From cb7741adf4252d5523f0c8b5a69d2af648a239ed Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Wed, 15 Jul 2020 11:22:50 -0400 Subject: [PATCH] Pausing, resuming, and seeking messages should be sent to each of the change stream go routines if there are multiple --- gtm.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/gtm.go b/gtm.go index c206725..c824594 100644 --- a/gtm.go +++ b/gtm.go @@ -446,7 +446,9 @@ func (ctx *OpCtx) isStopped() bool { func (ctx *OpCtx) Since(ts primitive.Timestamp) { ctx.lock.Lock() defer ctx.lock.Unlock() - ctx.seekC <- ts + for i := 0; i < cap(ctx.seekC); i++ { + ctx.seekC <- ts + } } func (ctx *OpCtx) Pause() { @@ -454,7 +456,9 @@ func (ctx *OpCtx) Pause() { defer ctx.lock.Unlock() if !ctx.paused { ctx.paused = true - ctx.pauseC <- true + for i := 0; i < cap(ctx.pauseC); i++ { + ctx.pauseC <- true + } } } @@ -463,7 +467,9 @@ func (ctx *OpCtx) Resume() { defer ctx.lock.Unlock() if ctx.paused { ctx.paused = false - ctx.resumeC <- true + for i := 0; i < cap(ctx.resumeC); i++ { + ctx.resumeC <- true + } } }