Skip to content

Commit

Permalink
Merge pull request #80 from noctarius/fix-79
Browse files Browse the repository at this point in the history
Fixes #79: Race condition for stop-after-start of dispatcher
  • Loading branch information
noctarius authored Sep 30, 2023
2 parents fa1f4aa + df16b36 commit 556c578
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions internal/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/eventhandlers"
"github.com/noctarius/timescaledb-event-streamer/spi/task"
"sync/atomic"
)

type taskManager struct {
Expand All @@ -39,7 +40,7 @@ type taskManager struct {
logicalHandlers []eventhandlers.LogicalReplicationEventHandler
snapshotHandlers []eventhandlers.SnapshottingEventHandler
shutdownAwaiter *waiting.ShutdownAwaiter
shutdownActive bool
shutdownActive atomic.Bool
}

func NewTaskManager(
Expand All @@ -65,6 +66,7 @@ func NewTaskManager(
logicalHandlers: make([]eventhandlers.LogicalReplicationEventHandler, 0),
snapshotHandlers: make([]eventhandlers.SnapshottingEventHandler, 0),
shutdownAwaiter: waiting.NewShutdownAwaiter(),
shutdownActive: atomic.Bool{},
}
return d, nil
}
Expand Down Expand Up @@ -210,7 +212,7 @@ func (d *taskManager) StartDispatcher() {
}

func (d *taskManager) StopDispatcher() error {
d.shutdownActive = true
d.shutdownActive.Store(true)
d.shutdownAwaiter.SignalShutdown()
d.taskQueue.Close()
return d.shutdownAwaiter.AwaitDone()
Expand All @@ -220,7 +222,7 @@ func (d *taskManager) EnqueueTask(
task task.Task,
) error {

if d.shutdownActive {
if d.shutdownActive.Load() {
return fmt.Errorf("shutdown active, draining only")
}
d.taskQueue.Send(task)
Expand All @@ -231,7 +233,7 @@ func (d *taskManager) EnqueueTaskAndWait(
t task.Task,
) error {

if d.shutdownActive {
if d.shutdownActive.Load() {
return fmt.Errorf("shutdown active, draining only")
}
done := waiting.NewWaiter()
Expand Down

0 comments on commit 556c578

Please sign in to comment.