diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 5d4d20d..55b2858 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -71,7 +71,7 @@ jobs: - name: Run golang tests with coverage run: make test_coverage - - uses: codecov/codecov-action@v4 # Docs: + - uses: codecov/codecov-action@v3 # Docs: with: file: ./coverage-ci/summary.txt fail_ci_if_error: false diff --git a/fsm/fsm.go b/fsm/fsm.go index d68b169..7ccff5c 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -4,17 +4,20 @@ import ( "sync/atomic" "github.com/roadrunner-server/errors" + "go.uber.org/zap" ) // NewFSM returns new FSM implementation based on initial state -func NewFSM(initialState int64) *Fsm { +func NewFSM(initialState int64, log *zap.Logger) *Fsm { return &Fsm{ + log: log, currentState: &initialState, } } // Fsm is general https://en.wikipedia.org/wiki/Finite-state_machine to transition between worker states type Fsm struct { + log *zap.Logger numExecs uint64 // to be lightweight, use UnixNano lastUsed uint64 @@ -36,6 +39,7 @@ Transition moves worker from one state to another func (s *Fsm) Transition(to int64) { err := s.recognizer(to) if err != nil { + s.log.Debug("fsm transition error", zap.Error(err)) return } @@ -131,7 +135,16 @@ func (s *Fsm) recognizer(to int64) error { return errors.E(op, errors.Errorf("can't transition from state: %s", s.String())) // to - case StateInvalid, StateStopping, StateStopped, StateMaxJobsReached, StateErrored, StateIdleTTLReached, StateTTLReached, StateMaxMemoryReached, StateExecTTLReached: + case + StateInvalid, + StateStopping, + StateStopped, + StateMaxJobsReached, + StateErrored, + StateIdleTTLReached, + StateTTLReached, + StateMaxMemoryReached, + StateExecTTLReached: // from if atomic.LoadInt64(s.currentState) == StateDestroyed { return errors.E(op, errors.Errorf("can't transition from state: %s", s.String())) diff --git a/fsm/state_test.go b/fsm/state_test.go index c0bd05c..66f051a 100755 --- a/fsm/state_test.go +++ b/fsm/state_test.go @@ -4,24 +4,29 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) func Test_NewState(t *testing.T) { - st := NewFSM(StateErrored) + log, err := zap.NewDevelopment() + assert.NoError(t, err) + st := NewFSM(StateErrored, log) assert.Equal(t, "errored", st.String()) - assert.Equal(t, "inactive", NewFSM(StateInactive).String()) - assert.Equal(t, "ready", NewFSM(StateReady).String()) - assert.Equal(t, "working", NewFSM(StateWorking).String()) - assert.Equal(t, "stopped", NewFSM(StateStopped).String()) - assert.Equal(t, "undefined", NewFSM(1000).String()) + assert.Equal(t, "inactive", NewFSM(StateInactive, log).String()) + assert.Equal(t, "ready", NewFSM(StateReady, log).String()) + assert.Equal(t, "working", NewFSM(StateWorking, log).String()) + assert.Equal(t, "stopped", NewFSM(StateStopped, log).String()) + assert.Equal(t, "undefined", NewFSM(1000, log).String()) } func Test_IsActive(t *testing.T) { - assert.False(t, NewFSM(StateInactive).IsActive()) - assert.True(t, NewFSM(StateReady).IsActive()) - assert.True(t, NewFSM(StateWorking).IsActive()) - assert.False(t, NewFSM(StateStopped).IsActive()) - assert.False(t, NewFSM(StateErrored).IsActive()) + log, err := zap.NewDevelopment() + assert.NoError(t, err) + assert.False(t, NewFSM(StateInactive, log).IsActive()) + assert.True(t, NewFSM(StateReady, log).IsActive()) + assert.True(t, NewFSM(StateWorking, log).IsActive()) + assert.False(t, NewFSM(StateStopped, log).IsActive()) + assert.False(t, NewFSM(StateErrored, log).IsActive()) } diff --git a/pool/static_pool/supervisor.go b/pool/static_pool/supervisor.go index b0ee804..1a1a96b 100644 --- a/pool/static_pool/supervisor.go +++ b/pool/static_pool/supervisor.go @@ -12,8 +12,8 @@ import ( const ( MB = 1024 * 1024 - // NSEC_IN_SEC nanoseconds in second - NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck + // NsecInSec nanoseconds in second + NsecInSec int64 = 1000000000 ) func (sp *Pool) Start() { @@ -71,6 +71,9 @@ func (sp *Pool) control() { _ = workers[i].Stop() } + // call cleanup callback + workers[i].Callback() + continue } @@ -150,7 +153,7 @@ func (sp *Pool) control() { // convert last used to unixNano and sub time.now to seconds // negative number, because lu always in the past, except for the `back to the future` :) - res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 + res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1 // maxWorkerIdle more than diff between now and last used // for example: diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index 54e574f..48acfe1 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -61,6 +61,39 @@ func Test_SupervisedPool_Exec(t *testing.T) { cancel() } +func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(log()), + cfgSupervised, + log(), + ) + + require.NoError(t, err) + require.NotNil(t, p) + + time.Sleep(time.Second) + + pidBefore := p.Workers()[0].Pid() + + for i := 0; i < 10; i++ { + time.Sleep(time.Second) + _, err = p.Exec(ctx, &payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }, make(chan struct{})) + require.NoError(t, err) + } + + require.NotEqual(t, pidBefore, p.Workers()[0].Pid()) + + ctxNew, cancel := context.WithTimeout(ctx, time.Second) + p.Destroy(ctxNew) + cancel() +} + func Test_SupervisedPool_ImmediateDestroy(t *testing.T) { ctx := context.Background() @@ -118,6 +151,31 @@ func Test_SupervisedPool_NilConfig(t *testing.T) { assert.Nil(t, p) } +func Test_SupervisedPool_RemoveNoWorkers(t *testing.T) { + ctx := context.Background() + + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(log()), + cfgSupervised, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + + wrks := p.Workers() + for i := 0; i < len(wrks); i++ { + assert.NoError(t, p.RemoveWorker(ctx)) + } + + assert.Len(t, p.Workers(), 0) + p.Destroy(ctx) +} + func Test_SupervisedPool_RemoveWorker(t *testing.T) { ctx := context.Background() @@ -136,13 +194,19 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) { wrks := p.Workers() for i := 0; i < len(wrks); i++ { - assert.NoError(t, p.RemoveWorker(wrks[i])) + assert.NoError(t, p.RemoveWorker(ctx)) } _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.Error(t, err) + + err = p.AddWorker() assert.NoError(t, err) - assert.Len(t, p.Workers(), 0) + _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + + assert.Len(t, p.Workers(), 1) p.Destroy(ctx) } diff --git a/pool/static_pool/workers_pool.go b/pool/static_pool/workers_pool.go index 498d661..deb40c4 100644 --- a/pool/static_pool/workers_pool.go +++ b/pool/static_pool/workers_pool.go @@ -118,10 +118,19 @@ func (sp *Pool) Workers() (workers []*worker.Process) { return sp.ww.List() } -// RemoveWorker function should not be used outside the `Wait` function -func (sp *Pool) RemoveWorker(wb *worker.Process) error { - sp.ww.Remove(wb) - return nil +func (sp *Pool) RemoveWorker(ctx context.Context) error { + var cancel context.CancelFunc + _, ok := ctx.Deadline() + if !ok { + ctx, cancel = context.WithTimeout(ctx, sp.cfg.DestroyTimeout) + defer cancel() + } + + return sp.ww.RemoveWorker(ctx) +} + +func (sp *Pool) AddWorker() error { + return sp.ww.AddWorker() } // Exec executes provided payload on the worker @@ -298,9 +307,9 @@ func (sp *Pool) Reset(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, sp.cfg.ResetTimeout) defer cancel() // reset all workers - sp.ww.Reset(ctx) + numToAllocate := sp.ww.Reset(ctx) // re-allocate all workers - workers, err := pool.AllocateParallel(sp.cfg.NumWorkers, sp.allocator) + workers, err := pool.AllocateParallel(numToAllocate, sp.allocator) if err != nil { return err } diff --git a/pool/static_pool/workers_pool_test.go b/pool/static_pool/workers_pool_test.go index 27375d8..62f58fa 100644 --- a/pool/static_pool/workers_pool_test.go +++ b/pool/static_pool/workers_pool_test.go @@ -54,6 +54,47 @@ func Test_NewPool(t *testing.T) { p.Destroy(ctx) } +func Test_NewPoolAddRemoveWorkers(t *testing.T) { + var testCfg2 = &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 500, + DestroyTimeout: time.Second * 500, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(log()), + testCfg2, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + resp := <-r + + assert.Equal(t, []byte("hello"), resp.Body()) + assert.NoError(t, err) + + for i := 0; i < 100; i++ { + err = p.AddWorker() + assert.NoError(t, err) + } + + err = p.AddWorker() + assert.NoError(t, err) + + err = p.RemoveWorker(ctx) + assert.NoError(t, err) + + err = p.RemoveWorker(ctx) + assert.NoError(t, err) + + p.Destroy(ctx) +} + func Test_StaticPool_NilFactory(t *testing.T) { ctx := context.Background() p, err := NewPool( @@ -104,11 +145,17 @@ func Test_StaticPool_ImmediateDestroy(t *testing.T) { func Test_StaticPool_RemoveWorker(t *testing.T) { ctx := context.Background() + var testCfg2 = &pool.Config{ + NumWorkers: 5, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + } + p, err := NewPool( ctx, func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), - testCfg, + testCfg2, log(), ) assert.NoError(t, err) @@ -119,18 +166,24 @@ func Test_StaticPool_RemoveWorker(t *testing.T) { wrks := p.Workers() for i := 0; i < len(wrks); i++ { - assert.NoError(t, p.RemoveWorker(wrks[i])) + assert.NoError(t, p.RemoveWorker(ctx)) } _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.Error(t, err) + + err = p.AddWorker() assert.NoError(t, err) - assert.Len(t, p.Workers(), 0) + _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + + assert.Len(t, p.Workers(), 1) p.Destroy(ctx) } -func Test_Poll_Reallocate(t *testing.T) { +func Test_Pool_Reallocate(t *testing.T) { var testCfg2 = &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 500, diff --git a/worker/worker.go b/worker/worker.go index 36b3091..a25946c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -31,6 +31,7 @@ type Process struct { created time.Time log *zap.Logger + callback func() // fsm holds information about current Process state, // number of Process executions, buf status change time. // publicly this object is receive-only and protected using Mutex @@ -98,7 +99,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { w.log = z } - w.fsm = fsm.NewFSM(fsm.StateInactive) + w.fsm = fsm.NewFSM(fsm.StateInactive, w.log) // set self as stderr implementation (Writer interface) rc, err := cmd.StderrPipe() @@ -130,6 +131,17 @@ func (w *Process) Pid() int64 { return int64(w.pid) } +func (w *Process) AddCallback(cb func()) { + w.callback = cb +} + +func (w *Process) Callback() { + if w.callback == nil { + return + } + w.callback() +} + // Created returns time, worker was created at. func (w *Process) Created() time.Time { return w.created diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 0ec592c..82d8e25 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -13,8 +13,6 @@ import ( type Vec struct { rwm sync.RWMutex - // container size - len uint64 // destroy signal destroy uint64 // reset signal @@ -23,12 +21,11 @@ type Vec struct { workers chan *worker.Process } -func NewVector(len uint64) *Vec { +func NewVector() *Vec { vec := &Vec{ - len: len, destroy: 0, reset: 0, - workers: make(chan *worker.Process, len), + workers: make(chan *worker.Process, 1000), } return vec @@ -37,74 +34,15 @@ func NewVector(len uint64) *Vec { // Push is O(1) operation // In case of TTL and full channel O(n) worst case, where n is len of the channel func (v *Vec) Push(w *worker.Process) { + // add remove callback select { case v.workers <- w: // default select branch is only possible when dealing with TTL // because in that case, workers in the v.workers channel can be TTL-ed and killed // but presenting in the channel default: - // Stop Pop operations - v.rwm.Lock() - defer v.rwm.Unlock() - - /* - we can be in the default branch by the following reasons: - 1. TTL is set with no requests during the TTL - 2. Violated Get <-> Release operation (how ??) - */ - - for i := 0; i < len(v.workers); i++ { - /* - We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. - BUT while we are draining the vector, some worker might be reallocated and pushed into the v.workers - so, down by the code, we might have a problem when pushing the new worker to the v.workers - */ - wrk := <-v.workers - - switch wrk.State().CurrentState() { - // good states - case fsm.StateWorking, fsm.StateReady: - // put the worker back - // generally, while send and receive operations are concurrent (from the channel), channel behave - // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO - select { - case v.workers <- wrk: - continue - default: - // kill the worker from the channel - wrk.State().Transition(fsm.StateInvalid) - _ = wrk.Kill() - - continue - } - /* - Bad states are here. - */ - default: - // kill the current worker (just to be sure it's dead) - if wrk != nil { - _ = wrk.Kill() - } - - if !w.State().Compare(fsm.StateReady) { - _ = wrk.Kill() - return - } - // replace with the new one and return from the loop - // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker - // But this case will be handled in the worker_watcher::Get - select { - case v.workers <- w: - return - // the place for the new worker was occupied before - default: - // kill the new worker and reallocate it - w.State().Transition(fsm.StateInvalid) - _ = w.Kill() - return - } - } - } + // channel is full + _ = w.Kill() } } @@ -112,50 +50,33 @@ func (v *Vec) Len() int { return len(v.workers) } -func (v *Vec) Remove(_ int64) {} - func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { // remove all workers and return if atomic.LoadUint64(&v.destroy) == 1 { - // drain channel - for { - select { - case <-v.workers: - continue - default: - return nil, errors.E(errors.WatcherStopped) - } - } + return nil, errors.E(errors.WatcherStopped) } // wait for the reset to complete for atomic.CompareAndSwapUint64(&v.reset, 1, 1) { - time.Sleep(time.Millisecond) + select { + case <-ctx.Done(): + default: + time.Sleep(time.Millisecond * 100) + } } // used only for the TTL-ed workers v.rwm.RLock() - defer v.rwm.RUnlock() - select { case w := <-v.workers: + v.rwm.RUnlock() return w, nil case <-ctx.Done(): + v.rwm.RUnlock() return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) } } -func (v *Vec) Drain() { - for { - select { - case <-v.workers: - continue - default: - return - } - } -} - func (v *Vec) ResetDone() { atomic.StoreUint64(&v.reset, 0) } @@ -167,3 +88,52 @@ func (v *Vec) Reset() { func (v *Vec) Destroy() { atomic.StoreUint64(&v.destroy, 1) } + +func (v *Vec) Remove() { + // Stop Pop operations + v.rwm.Lock() + defer v.rwm.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + + for i := 0; i < len(v.workers); i++ { + /* + We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. + BUT while we are draining the vector, some worker might be reallocated and pushed into the v.workers + so, down by the code, we might have a problem when pushing the new worker to the v.workers + */ + wrk := <-v.workers + + switch wrk.State().CurrentState() { + // good states + case fsm.StateWorking, fsm.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + select { + case v.workers <- wrk: + continue + + // all bad states are here + default: + // kill the worker from the channel + wrk.State().Transition(fsm.StateInvalid) + _ = wrk.Kill() + + continue + } + /* + Bad states are here. + */ + default: + // kill the current worker (just to be sure it's dead) + if wrk != nil { + _ = wrk.Kill() + } + } + } +} diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 6a6bc71..4a478e7 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -23,10 +23,11 @@ type WorkerWatcher struct { // actually don't have a lot of impl here, so interface not needed container *channel.Vec // used to control Destroy stage (that all workers are in the container) - numWorkers *uint64 + numWorkers uint64 eventBus *events.Bus - workers map[int64]*worker.Process + // map with the workers pointers + workers sync.Map // map[int64]*worker.Process log *zap.Logger @@ -38,17 +39,15 @@ type WorkerWatcher struct { func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *WorkerWatcher { eb, _ := events.NewEventBus() return &WorkerWatcher{ - container: channel.NewVector(numWorkers), + container: channel.NewVector(), log: log, eventBus: eb, // pass a ptr to the number of workers to avoid blocking in the TTL loop - numWorkers: toPtr(numWorkers), + numWorkers: numWorkers, allocateTimeout: allocateTimeout, - //workers: make([]*worker.Process, 0, numWorkers), - workers: make(map[int64]*worker.Process, numWorkers), - - allocator: allocator, + workers: sync.Map{}, // make(map[int64]*worker.Process, numWorkers), + allocator: allocator, } } @@ -59,19 +58,49 @@ func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { ii := i ww.container.Push(workers[ii]) // add worker to watch slice - ww.workers[workers[ii].Pid()] = workers[ii] + ww.workers.Store(workers[ii].Pid(), workers[ii]) ww.addToWatch(workers[ii]) } return nil } +func (ww *WorkerWatcher) AddWorker() error { + err := ww.Allocate() + if err != nil { + return err + } + + atomic.AddUint64(&ww.numWorkers, 1) + return nil +} + +func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { + // we can't remove worker if there are no workers :) + if ww.container.Len() == 0 { + return nil + } + + w, err := ww.Take(ctx) + if err != nil { + return err + } + + // destroy and stop + w.State().Transition(fsm.StateDestroyed) + _ = w.Stop() + + atomic.AddUint64(&ww.numWorkers, ^uint64(0)) + ww.workers.Delete(w.Pid()) + + return nil +} + // Take is not a thread safe operation func (ww *WorkerWatcher) Take(ctx context.Context) (*worker.Process, error) { const op = errors.Op("worker_watcher_get_free_worker") // we need lock here to prevent Pop operation when ww in the resetting state // thread safe operation w, err := ww.container.Pop(ctx) - if err != nil { if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) @@ -137,7 +166,7 @@ func (ww *WorkerWatcher) Allocate() error { select { case <-tt: // reduce number of workers - atomic.AddUint64(ww.numWorkers, ^uint64(0)) + atomic.AddUint64(&ww.numWorkers, ^uint64(0)) allocateFreq.Stop() // timeout exceed, worker can't be allocated return errors.E(op, errors.WorkerAllocate, err) @@ -160,26 +189,13 @@ func (ww *WorkerWatcher) Allocate() error { done: // add worker to Wait ww.addToWatch(sw) - - ww.Lock() // add new worker to the workers slice (to get information about workers in parallel) - ww.workers[sw.Pid()] = sw - ww.Unlock() - + ww.workers.Store(sw.Pid(), sw) // push the worker to the container ww.Release(sw) return nil } -// Remove worker -func (ww *WorkerWatcher) Remove(wb *worker.Process) { - ww.Lock() - defer ww.Unlock() - - // worker will be removed on the Get operation - delete(ww.workers, wb.Pid()) -} - // Release O(1) operation func (ww *WorkerWatcher) Release(w *worker.Process) { switch w.State().CurrentState() { @@ -208,12 +224,9 @@ func (ww *WorkerWatcher) Release(w *worker.Process) { } } -func (ww *WorkerWatcher) Reset(ctx context.Context) { - ww.Lock() +func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { // do not release new workers ww.container.Reset() - ww.Unlock() - tt := time.NewTicker(time.Millisecond * 10) defer tt.Stop() for { @@ -223,7 +236,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) { // that might be one of the workers is working // to proceed, all workers should be inside a channel - if atomic.LoadUint64(ww.numWorkers) != uint64(ww.container.Len()) { + if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { ww.RUnlock() continue } @@ -232,58 +245,56 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) { // Pop operation is blocked, push can't be done, since it's not possible to pop ww.Lock() - // drain channel - ww.container.Drain() - wg := &sync.WaitGroup{} - wg.Add(len(ww.workers)) - - for _, v := range ww.workers { - v := v - go func() { + ww.workers.Range(func(key, value any) bool { + wg.Add(1) + go func(k int64, v *worker.Process) { defer wg.Done() v.State().Transition(fsm.StateDestroyed) // kill the worker _ = v.Stop() - }() - } + // remove worker from the channel + v.Callback() + // delete worker from the map + ww.workers.Delete(v) + }(key.(int64), value.(*worker.Process)) + return true + }) wg.Wait() - // optimization, only 1 call - for k := range ww.workers { - delete(ww.workers, k) - } - ww.container.ResetDone() + + // todo: rustatian, do we need this mutex? ww.Unlock() - return + + return atomic.LoadUint64(&ww.numWorkers) case <-ctx.Done(): - // drain channel - ww.container.Drain() // kill workers ww.Lock() // drain workers slice wg := &sync.WaitGroup{} - wg.Add(len(ww.workers)) - for _, v := range ww.workers { - go func(w *worker.Process) { + ww.workers.Range(func(key, value any) bool { + wg.Add(1) + go func(k int64, v *worker.Process) { defer wg.Done() - w.State().Transition(fsm.StateDestroyed) - // kill the worker - _ = w.Stop() - }(v) - } + v.State().Transition(fsm.StateDestroyed) + // stop the worker + _ = v.Stop() + // remove worker from the channel + v.Callback() + // delete worker from the map + ww.workers.Delete(v) + }(key.(int64), value.(*worker.Process)) + return true + }) wg.Wait() - // optimization, only 1 call - for k := range ww.workers { - delete(ww.workers, k) - } ww.container.ResetDone() ww.Unlock() - return + + return atomic.LoadUint64(&ww.numWorkers) } } } @@ -303,7 +314,7 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { case <-tt.C: ww.RLock() // that might be one of the workers is working - if atomic.LoadUint64(ww.numWorkers) != uint64(ww.container.Len()) { + if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { ww.RUnlock() continue } @@ -316,46 +327,44 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { // drain channel, will not actually pop, only drain a channel _, _ = ww.container.Pop(ctx) wg := &sync.WaitGroup{} - wg.Add(len(ww.workers)) - for _, v := range ww.workers { - go func(w *worker.Process) { + + ww.workers.Range(func(key, value any) bool { + wg.Add(1) + go func(k int64, v *worker.Process) { defer wg.Done() - w.State().Transition(fsm.StateDestroyed) + v.State().Transition(fsm.StateDestroyed) // kill the worker - _ = w.Stop() - }(v) - } + _ = v.Stop() - wg.Wait() - // optimization, only 1 call - for k := range ww.workers { - delete(ww.workers, k) - } + // delete worker from the map + ww.workers.Delete(v) + }(key.(int64), value.(*worker.Process)) + return true + }) + wg.Wait() ww.Unlock() return case <-ctx.Done(): - // drain channel - ww.container.Drain() // kill workers ww.Lock() wg := &sync.WaitGroup{} - wg.Add(len(ww.workers)) - for _, v := range ww.workers { - go func(w *worker.Process) { + ww.workers.Range(func(key, value any) bool { + wg.Add(1) + go func(k int64, v *worker.Process) { defer wg.Done() - w.State().Transition(fsm.StateDestroyed) + v.State().Transition(fsm.StateDestroyed) // kill the worker - _ = w.Stop() - }(v) - } + _ = v.Stop() + // remove worker from the channel + v.Callback() + // delete worker from the map + ww.workers.Delete(v) + }(key.(int64), value.(*worker.Process)) + return true + }) wg.Wait() - // optimization, only 1 call - for k := range ww.workers { - delete(ww.workers, k) - } - ww.Unlock() return } @@ -367,28 +376,28 @@ func (ww *WorkerWatcher) List() []*worker.Process { ww.RLock() defer ww.RUnlock() - if len(ww.workers) == 0 { + if atomic.LoadUint64(&ww.numWorkers) == 0 { return nil } - base := make([]*worker.Process, 0, len(ww.workers)) + base := make([]*worker.Process, 0, 2) - for _, v := range ww.workers { - base = append(base, v) - } + ww.workers.Range(func(_, value any) bool { + base = append(base, value.(*worker.Process)) + return true + }) return base } func (ww *WorkerWatcher) wait(w *worker.Process) { - const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { ww.log.Debug("worker stopped", zap.String("internal_event_name", events.EventWorkerWaitExit.String()), zap.Error(err)) } // remove worker - ww.Remove(w) + ww.workers.Delete(w.Pid()) if w.State().Compare(fsm.StateDestroyed) { // worker was manually destroyed, no need to replace @@ -399,14 +408,7 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { err = ww.Allocate() if err != nil { ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - - // no workers at all, panic - ww.Lock() - defer ww.Unlock() - - if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { - panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v, no workers in the pool", err))) - } + return } // this event used mostly for the temporal plugin @@ -414,11 +416,11 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { } func (ww *WorkerWatcher) addToWatch(wb *worker.Process) { + // this callback is used to remove the bad workers from the container + wb.AddCallback(func() { + ww.container.Remove() + }) go func() { ww.wait(wb) }() } - -func toPtr[T any](t T) *T { - return &t -}