Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

feature: dynamic workers scaling #95

Merged
merged 4 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
- name: Run golang tests with coverage
run: make test_coverage

- uses: codecov/codecov-action@v4 # Docs: <https://github.com/codecov/codecov-action>
- uses: codecov/codecov-action@v3 # Docs: <https://github.com/codecov/codecov-action>
with:
file: ./coverage-ci/summary.txt
fail_ci_if_error: false
17 changes: 15 additions & 2 deletions fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"sync/atomic"

"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

// NewFSM returns new FSM implementation based on initial state
func NewFSM(initialState int64) *Fsm {
func NewFSM(initialState int64, log *zap.Logger) *Fsm {
return &Fsm{
log: log,
currentState: &initialState,
}
}

// Fsm is general https://en.wikipedia.org/wiki/Finite-state_machine to transition between worker states
type Fsm struct {
log *zap.Logger
numExecs uint64
// to be lightweight, use UnixNano
lastUsed uint64
Expand All @@ -36,6 +39,7 @@ Transition moves worker from one state to another
func (s *Fsm) Transition(to int64) {
err := s.recognizer(to)
if err != nil {
s.log.Debug("fsm transition error", zap.Error(err))
return
}

Expand Down Expand Up @@ -131,7 +135,16 @@ func (s *Fsm) recognizer(to int64) error {

return errors.E(op, errors.Errorf("can't transition from state: %s", s.String()))
// to
case StateInvalid, StateStopping, StateStopped, StateMaxJobsReached, StateErrored, StateIdleTTLReached, StateTTLReached, StateMaxMemoryReached, StateExecTTLReached:
case
StateInvalid,
StateStopping,
StateStopped,
StateMaxJobsReached,
StateErrored,
StateIdleTTLReached,
StateTTLReached,
StateMaxMemoryReached,
StateExecTTLReached:
// from
if atomic.LoadInt64(s.currentState) == StateDestroyed {
return errors.E(op, errors.Errorf("can't transition from state: %s", s.String()))
Expand Down
27 changes: 16 additions & 11 deletions fsm/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,29 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func Test_NewState(t *testing.T) {
st := NewFSM(StateErrored)
log, err := zap.NewDevelopment()
assert.NoError(t, err)
st := NewFSM(StateErrored, log)

assert.Equal(t, "errored", st.String())

assert.Equal(t, "inactive", NewFSM(StateInactive).String())
assert.Equal(t, "ready", NewFSM(StateReady).String())
assert.Equal(t, "working", NewFSM(StateWorking).String())
assert.Equal(t, "stopped", NewFSM(StateStopped).String())
assert.Equal(t, "undefined", NewFSM(1000).String())
assert.Equal(t, "inactive", NewFSM(StateInactive, log).String())
assert.Equal(t, "ready", NewFSM(StateReady, log).String())
assert.Equal(t, "working", NewFSM(StateWorking, log).String())
assert.Equal(t, "stopped", NewFSM(StateStopped, log).String())
assert.Equal(t, "undefined", NewFSM(1000, log).String())
}

func Test_IsActive(t *testing.T) {
assert.False(t, NewFSM(StateInactive).IsActive())
assert.True(t, NewFSM(StateReady).IsActive())
assert.True(t, NewFSM(StateWorking).IsActive())
assert.False(t, NewFSM(StateStopped).IsActive())
assert.False(t, NewFSM(StateErrored).IsActive())
log, err := zap.NewDevelopment()
assert.NoError(t, err)
assert.False(t, NewFSM(StateInactive, log).IsActive())
assert.True(t, NewFSM(StateReady, log).IsActive())
assert.True(t, NewFSM(StateWorking, log).IsActive())
assert.False(t, NewFSM(StateStopped, log).IsActive())
assert.False(t, NewFSM(StateErrored, log).IsActive())
}
9 changes: 6 additions & 3 deletions pool/static_pool/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
const (
MB = 1024 * 1024

// NSEC_IN_SEC nanoseconds in second
NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
// NsecInSec nanoseconds in second
NsecInSec int64 = 1000000000
)

func (sp *Pool) Start() {
Expand Down Expand Up @@ -71,6 +71,9 @@ func (sp *Pool) control() {
_ = workers[i].Stop()
}

// call cleanup callback
workers[i].Callback()

continue
}

Expand Down Expand Up @@ -150,7 +153,7 @@ func (sp *Pool) control() {

// convert last used to unixNano and sub time.now to seconds
// negative number, because lu always in the past, except for the `back to the future` :)
res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1

// maxWorkerIdle more than diff between now and last used
// for example:
Expand Down
68 changes: 66 additions & 2 deletions pool/static_pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,39 @@ func Test_SupervisedPool_Exec(t *testing.T) {
cancel()
}

func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(log()),
cfgSupervised,
log(),
)

require.NoError(t, err)
require.NotNil(t, p)

time.Sleep(time.Second)

pidBefore := p.Workers()[0].Pid()

for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = p.Exec(ctx, &payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
}, make(chan struct{}))
require.NoError(t, err)
}

require.NotEqual(t, pidBefore, p.Workers()[0].Pid())

ctxNew, cancel := context.WithTimeout(ctx, time.Second)
p.Destroy(ctxNew)
cancel()
}

func Test_SupervisedPool_ImmediateDestroy(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -118,6 +151,31 @@ func Test_SupervisedPool_NilConfig(t *testing.T) {
assert.Nil(t, p)
}

func Test_SupervisedPool_RemoveNoWorkers(t *testing.T) {
ctx := context.Background()

p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(log()),
cfgSupervised,
log(),
)
assert.NoError(t, err)
assert.NotNil(t, p)

_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.NoError(t, err)

wrks := p.Workers()
for i := 0; i < len(wrks); i++ {
assert.NoError(t, p.RemoveWorker(ctx))
}

assert.Len(t, p.Workers(), 0)
p.Destroy(ctx)
}

func Test_SupervisedPool_RemoveWorker(t *testing.T) {
ctx := context.Background()

Expand All @@ -136,13 +194,19 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) {

wrks := p.Workers()
for i := 0; i < len(wrks); i++ {
assert.NoError(t, p.RemoveWorker(wrks[i]))
assert.NoError(t, p.RemoveWorker(ctx))
}

_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)

err = p.AddWorker()
assert.NoError(t, err)

assert.Len(t, p.Workers(), 0)
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.NoError(t, err)

assert.Len(t, p.Workers(), 1)

p.Destroy(ctx)
}
Expand Down
21 changes: 15 additions & 6 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,19 @@ func (sp *Pool) Workers() (workers []*worker.Process) {
return sp.ww.List()
}

// RemoveWorker function should not be used outside the `Wait` function
func (sp *Pool) RemoveWorker(wb *worker.Process) error {
sp.ww.Remove(wb)
return nil
func (sp *Pool) RemoveWorker(ctx context.Context) error {
var cancel context.CancelFunc
_, ok := ctx.Deadline()
if !ok {
ctx, cancel = context.WithTimeout(ctx, sp.cfg.DestroyTimeout)
defer cancel()
}

return sp.ww.RemoveWorker(ctx)
}

func (sp *Pool) AddWorker() error {
return sp.ww.AddWorker()
}

// Exec executes provided payload on the worker
Expand Down Expand Up @@ -298,9 +307,9 @@ func (sp *Pool) Reset(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, sp.cfg.ResetTimeout)
defer cancel()
// reset all workers
sp.ww.Reset(ctx)
numToAllocate := sp.ww.Reset(ctx)
// re-allocate all workers
workers, err := pool.AllocateParallel(sp.cfg.NumWorkers, sp.allocator)
workers, err := pool.AllocateParallel(numToAllocate, sp.allocator)
if err != nil {
return err
}
Expand Down
61 changes: 57 additions & 4 deletions pool/static_pool/workers_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,47 @@ func Test_NewPool(t *testing.T) {
p.Destroy(ctx)
}

func Test_NewPoolAddRemoveWorkers(t *testing.T) {
var testCfg2 = &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
DestroyTimeout: time.Second * 500,
}

ctx := context.Background()
p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(log()),
testCfg2,
log(),
)
assert.NoError(t, err)
assert.NotNil(t, p)

r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
resp := <-r

assert.Equal(t, []byte("hello"), resp.Body())
assert.NoError(t, err)

for i := 0; i < 100; i++ {
err = p.AddWorker()
assert.NoError(t, err)
}

err = p.AddWorker()
assert.NoError(t, err)

err = p.RemoveWorker(ctx)
assert.NoError(t, err)

err = p.RemoveWorker(ctx)
assert.NoError(t, err)

p.Destroy(ctx)
}

func Test_StaticPool_NilFactory(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
Expand Down Expand Up @@ -104,11 +145,17 @@ func Test_StaticPool_ImmediateDestroy(t *testing.T) {
func Test_StaticPool_RemoveWorker(t *testing.T) {
ctx := context.Background()

var testCfg2 = &pool.Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
}

p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(log()),
testCfg,
testCfg2,
log(),
)
assert.NoError(t, err)
Expand All @@ -119,18 +166,24 @@ func Test_StaticPool_RemoveWorker(t *testing.T) {

wrks := p.Workers()
for i := 0; i < len(wrks); i++ {
assert.NoError(t, p.RemoveWorker(wrks[i]))
assert.NoError(t, p.RemoveWorker(ctx))
}

_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)

err = p.AddWorker()
assert.NoError(t, err)

assert.Len(t, p.Workers(), 0)
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.NoError(t, err)

assert.Len(t, p.Workers(), 1)

p.Destroy(ctx)
}

func Test_Poll_Reallocate(t *testing.T) {
func Test_Pool_Reallocate(t *testing.T) {
var testCfg2 = &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
Expand Down
14 changes: 13 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
created time.Time
log *zap.Logger

callback func()
// fsm holds information about current Process state,
// number of Process executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
Expand Down Expand Up @@ -98,7 +99,7 @@
w.log = z
}

w.fsm = fsm.NewFSM(fsm.StateInactive)
w.fsm = fsm.NewFSM(fsm.StateInactive, w.log)

// set self as stderr implementation (Writer interface)
rc, err := cmd.StderrPipe()
Expand Down Expand Up @@ -130,6 +131,17 @@
return int64(w.pid)
}

func (w *Process) AddCallback(cb func()) {
w.callback = cb
}

func (w *Process) Callback() {
if w.callback == nil {
return
}

Check warning on line 141 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L140-L141

Added lines #L140 - L141 were not covered by tests
w.callback()
}

// Created returns time, worker was created at.
func (w *Process) Created() time.Time {
return w.created
Expand Down
Loading
Loading