From 34adf20e66a1dd6012dbfb0203aa2100cd2b305a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 4 Jan 2024 12:16:32 +0100 Subject: [PATCH 1/3] fix: initial commit Signed-off-by: Valery Piashchynski --- .DS_Store | Bin 6148 -> 0 bytes pool/static_pool/debug.go | 10 +-- pool/static_pool/workers_pool.go | 98 ++++++++++++++------ worker/worker.go | 150 +++++++++++++++++-------------- 4 files changed, 157 insertions(+), 101 deletions(-) delete mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index e71aff485b45b9d2b949823ab505fba3f4c54673..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKy-EW?5T4N!0uqRgST5KLX-%_+^9j<6#7Go!Tp+RE&c?>px9|;Yd@38i*%{>) z4@(i9f!%L*es=CYxZN8f;_2ODOf({*0fr!p(j#K-b?sQNI9c{sEyuH~wpsUV@>@;v z>=m8Rl5S~RJpb`<()BfMUC&qT95&>n(}%~`v(mr#*1x{#aC7m3RNA90UDF2hrl|ZB zpG$99n-1RYbL~!k({`)(MRe=)VU+qV&VV!E3^)V-lL6e>B7=QJpPd0`z!~^tK+cDN zAs7vlVm>-B= sp.cfg.MaxJobs { + sp.log.Debug("max requests reached", zap.Int64("pid", w.Pid())) + w.State().Transition(fsm.StateMaxJobsReached) + } + if err != nil { if errors.Is(errors.Retry, err) { sp.ww.Release(w) @@ -203,12 +209,11 @@ begin: return nil, err case errors.Is(errors.SoftJob, err): - sp.log.Warn("soft worker error, worker won't be restarted", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err)) - // soft jobs errors are allowed, just put the worker back - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // mark old as invalid and stop - w.State().Transition(fsm.StateMaxJobsReached) - } + /* + in case of soft job error, we should not kill the worker, this is just an error payload from the worker. + */ + w.State().Transition(fsm.StateReady) + sp.log.Warn("soft worker error", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err)) sp.ww.Release(w) return nil, err @@ -228,14 +233,10 @@ begin: } } - if sp.cfg.MaxJobs != 0 { - if w.State().NumExecs() >= sp.cfg.MaxJobs { - w.State().Transition(fsm.StateMaxJobsReached) - } - } - // create channel for the stream (only if there are no errors) - resp := make(chan *PExec, 1) + resp := make(chan *PExec, 100) + // send the initial frame + sendResponse(resp, rsp, nil) switch { case rsp.Flags&frame.STREAM != 0: @@ -244,38 +245,69 @@ begin: go func() { // would be called on Goexit defer func() { + sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) close(resp) sp.ww.Release(w) }() - // send the initial frame - resp <- newPExec(rsp, nil) - // stream iterator for { select { // we received stop signal case <-stopCh: + sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout) err = w.StreamCancel(ctxT) + cancelT() if err != nil { + w.State().Transition(fsm.StateErrored) sp.log.Warn("stream cancel error", zap.Error(err)) - w.State().Transition(fsm.StateInvalid) + } else { + // successfully canceled + w.State().Transition(fsm.StateReady) + sp.log.Debug("transition to the ready state", zap.String("from", w.State().String())) } - cancelT() runtime.Goexit() default: - pld, next, errI := w.StreamIter() - if errI != nil { - resp <- newPExec(nil, errI) // exit from the goroutine - runtime.Goexit() - } - - resp <- newPExec(pld, nil) - if !next { - // we've got the last frame - runtime.Goexit() + // we have to set a stream timeout on every request + switch sp.supervisedExec { + case true: + ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL) + pld, next, errI := w.StreamIter(ctxT) + cancelT() + if errI != nil { + sp.log.Warn("stream iter error", zap.Error(err)) + // send error response + sendResponse(resp, nil, errI) + // move worker to the invalid state to restart + w.State().Transition(fsm.StateInvalid) + runtime.Goexit() + } + + sendResponse(resp, pld, nil) + if !next { + w.State().Transition(fsm.StateReady) + // we've got the last frame + runtime.Goexit() + } + case false: + pld, next, errI := w.StreamIter(context.Background()) + if errI != nil { + sp.log.Warn("stream iter error", zap.Error(err)) + // send error response + sendResponse(resp, nil, errI) + // move worker to the invalid state to restart + w.State().Transition(fsm.StateInvalid) + runtime.Goexit() + } + + sendResponse(resp, pld, nil) + if !next { + w.State().Transition(fsm.StateReady) + // we've got the last frame + runtime.Goexit() + } } } } @@ -284,7 +316,6 @@ begin: return resp, nil default: sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid())) - resp <- newPExec(rsp, nil) // return worker back sp.ww.Release(w) // close the channel @@ -293,6 +324,15 @@ begin: } } +// this is a wrapper to send response safely +func sendResponse(resp chan *PExec, pld *payload.Payload, err error) { + select { + case resp <- newPExec(pld, err): + default: + break + } +} + func (sp *Pool) QueueSize() uint64 { return atomic.LoadUint64(&sp.queue) } diff --git a/worker/worker.go b/worker/worker.go index 8516dbf..33eb3b7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -227,40 +227,84 @@ func (w *Process) Wait() error { } // StreamIter returns true if stream is available and payload -func (w *Process) StreamIter() (*payload.Payload, bool, error) { - pld, err := w.receiveFrame() - if err != nil { - return nil, false, err - } +func (w *Process) StreamIter(ctx context.Context) (*payload.Payload, bool, error) { + c := w.getCh() - // PING, we should respond with PONG - if pld.Flags&frame.PING != 0 { - // get a frame - fr := w.getFrame() + go func() { + rsp, err := w.receiveFrame() + if err != nil { + c <- &wexec{ + err: err, + } - fr.WriteVersion(fr.Header(), frame.Version1) + w.log.Debug("stream iter error", zap.Int64("pid", w.Pid()), zap.Error(err)) + // trash response + rsp = nil + runtime.Goexit() + } - fr.SetPongBit(fr.Header()) - fr.WriteCRC(fr.Header()) + c <- &wexec{ + payload: rsp, + } + }() - err := w.Relay().Send(fr) - w.State().RegisterExec() - if err != nil { - w.putFrame(fr) - w.State().Transition(fsm.StateErrored) - return nil, false, errors.E(errors.Network, err) + select { + // exec TTL reached + case <-ctx.Done(): + // we should kill the process here to ensure that it exited + errK := w.Kill() + err := stderr.Join(errK, ctx.Err()) + // we should wait for the exit from the worker + // 'c' channel here should return an error or nil + // because the goroutine holds the payload pointer (from the sync.Pool) + <-c + w.putCh(c) + return nil, false, errors.E(errors.ExecTTL, err) + case res := <-c: + w.putCh(c) + + if res.err != nil { + return nil, false, res.err } + // PING, we should respond with PONG + if res.payload.Flags&frame.PING != 0 { + if err := w.sendPONG(); err != nil { + return nil, false, err + } + } + + // pld.Flags&frame.STREAM !=0 -> we have stream bit set, so stream is available + return res.payload, res.payload.Flags&frame.STREAM != 0, nil + } +} + +func (w *Process) sendPONG() error { + // get a frame + fr := w.getFrame() + fr.WriteVersion(fr.Header(), frame.Version1) + + fr.SetPongBit(fr.Header()) + fr.WriteCRC(fr.Header()) + + err := w.Relay().Send(fr) + w.State().RegisterExec() + if err != nil { w.putFrame(fr) + w.State().Transition(fsm.StateErrored) + return errors.E(errors.Network, err) } - // !=0 -> we have stream bit set, so stream is available - return pld, pld.Flags&frame.STREAM != 0, nil + w.putFrame(fr) + return nil } // StreamCancel sends stop bit to the worker func (w *Process) StreamCancel(ctx context.Context) error { const op = errors.Op("sync_worker_send_frame") + if !w.State().Compare(fsm.StateWorking) { + return errors.Errorf("worker is not in the Working state, actual state: (%s)", w.State().String()) + } w.log.Debug("stream was canceled, sending stop bit", zap.Int64("pid", w.Pid())) // get a frame @@ -275,7 +319,6 @@ func (w *Process) StreamCancel(ctx context.Context) error { w.State().RegisterExec() if err != nil { w.putFrame(fr) - w.State().Transition(fsm.StateErrored) return errors.E(op, errors.Network, err) } @@ -293,34 +336,39 @@ func (w *Process) StreamCancel(ctx context.Context) error { } w.log.Debug("stream cancel error", zap.Int64("pid", w.Pid()), zap.Error(errrf)) + // trash response + rsp = nil runtime.Goexit() } // stream has ended if rsp.Flags&frame.STREAM == 0 { - w.State().Transition(fsm.StateReady) w.log.Debug("stream has ended", zap.Int64("pid", w.Pid())) c <- &wexec{} + // trash response + rsp = nil runtime.Goexit() } - // trash - rsp = nil } }() select { // exec TTL reached case <-ctx.Done(): + errK := w.Kill() + err := stderr.Join(errK, ctx.Err()) + // we should wait for the exit from the worker + // 'c' channel here should return an error or nil + // because the goroutine holds the payload pointer (from the sync.Pool) + <-c w.putCh(c) - return errors.E(op, errors.TimeOut, ctx.Err()) + return errors.E(op, errors.ExecTTL, err) case res := <-c: + w.putCh(c) if res.err != nil { - w.putCh(c) return res.err } - - w.putCh(c) return nil } } @@ -334,13 +382,12 @@ type wexec struct { func (w *Process) Exec(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("worker_exec_with_timeout") - c := w.getCh() - // worker was killed before it started to work (supervisor) if !w.State().Compare(fsm.StateReady) { - w.putCh(c) return nil, errors.E(op, errors.Retry, errors.Errorf("Process is not ready (%s)", w.State().String())) } + + c := w.getCh() // set last used time w.State().SetLastUsed(uint64(time.Now().UnixNano())) w.State().Transition(fsm.StateWorking) @@ -354,44 +401,19 @@ func (w *Process) Exec(ctx context.Context, p *payload.Payload) (*payload.Payloa runtime.Goexit() } - w.State().RegisterExec() rsp, err := w.receiveFrame() + w.State().RegisterExec() if err != nil { - c <- &wexec{ - err: err, - } - - /* - in case of soft job error, we should not kill the worker, this is just an error payload from the worker. - */ - if errors.Is(errors.SoftJob, err) { - // check if the supervisor changed the state - if w.State().Compare(fsm.StateWorking) { - w.State().Transition(fsm.StateReady) - } - runtime.Goexit() - } - - w.State().Transition(fsm.StateErrored) - runtime.Goexit() - } - - // double check the state, should be `working` - // if not, supervisor might change the state (TTL) - if !w.State().Compare(fsm.StateWorking) { c <- &wexec{ payload: rsp, - err: nil, + err: err, } runtime.Goexit() } - w.State().Transition(fsm.StateReady) - c <- &wexec{ payload: rsp, - err: nil, } }() @@ -399,24 +421,18 @@ func (w *Process) Exec(ctx context.Context, p *payload.Payload) (*payload.Payloa // exec TTL reached case <-ctx.Done(): errK := w.Kill() - err := stderr.Join(errK) + err := stderr.Join(errK, ctx.Err()) // we should wait for the exit from the worker // 'c' channel here should return an error or nil // because the goroutine holds the payload pointer (from the sync.Pool) <-c - if err != nil { - w.putCh(c) - // append timeout error - return nil, stderr.Join(err, ctx.Err(), errors.E(op, errors.ExecTTL)) - } w.putCh(c) - return nil, errors.E(op, errors.ExecTTL, ctx.Err()) + return nil, errors.E(op, errors.ExecTTL, err) case res := <-c: + w.putCh(c) if res.err != nil { - w.putCh(c) return nil, res.err } - w.putCh(c) return res.payload, nil } } From 0d107d112539b6f958dc7862fc43999bcaaea4d5 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 4 Jan 2024 12:17:07 +0100 Subject: [PATCH 2/3] chore: update worker logic Signed-off-by: Valery Piashchynski --- pool/static_pool/workers_pool.go | 3 ++- worker/worker.go | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pool/static_pool/workers_pool.go b/pool/static_pool/workers_pool.go index 758de57..17ff0ea 100644 --- a/pool/static_pool/workers_pool.go +++ b/pool/static_pool/workers_pool.go @@ -329,7 +329,8 @@ func sendResponse(resp chan *PExec, pld *payload.Payload, err error) { select { case resp <- newPExec(pld, err): default: - break + panic("can't send response to the channel") + // break } } diff --git a/worker/worker.go b/worker/worker.go index 33eb3b7..d283bc1 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -349,7 +349,6 @@ func (w *Process) StreamCancel(ctx context.Context) error { rsp = nil runtime.Goexit() } - } }() From d41504f3739d7430cac9cffca77c77a0c1a6f694 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 4 Jan 2024 20:47:41 +0100 Subject: [PATCH 3/3] fix: complete fix for the streams error Signed-off-by: Valery Piashchynski --- go.mod | 8 ++-- go.sum | 19 +++----- ipc/pipe/pipe_factory_spawn_test.go | 3 ++ ipc/pipe/pipe_factory_test.go | 3 ++ pool/static_pool/debug.go | 12 ++--- pool/static_pool/supervisor_test.go | 2 + pool/static_pool/workers_pool.go | 72 ++++++++++++++++++----------- worker/worker.go | 72 ++++++++++++++++++----------- 8 files changed, 115 insertions(+), 76 deletions(-) diff --git a/go.mod b/go.mod index 4e2f974..65a514f 100644 --- a/go.mod +++ b/go.mod @@ -2,19 +2,19 @@ module github.com/roadrunner-server/sdk/v4 go 1.21 -toolchain go1.22rc1 +toolchain go1.21.5 require ( github.com/goccy/go-json v0.10.2 github.com/google/uuid v1.5.0 - github.com/prometheus/client_golang v1.17.0 + github.com/prometheus/client_golang v1.18.0 github.com/roadrunner-server/errors v1.3.0 github.com/roadrunner-server/goridge/v3 v3.8.1 github.com/roadrunner-server/tcplisten v1.4.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.26.0 - golang.org/x/sync v0.5.0 + golang.org/x/sync v0.6.0 ) require ( @@ -31,7 +31,7 @@ require ( github.com/tklauser/numcpus v0.7.0 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/sys v0.16.0 // indirect google.golang.org/protobuf v1.32.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8a5e0d8..e04cd5a 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,6 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= @@ -19,13 +17,12 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= -github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= @@ -56,16 +53,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/ipc/pipe/pipe_factory_spawn_test.go b/ipc/pipe/pipe_factory_spawn_test.go index 0c61687..b5f3b38 100644 --- a/ipc/pipe/pipe_factory_spawn_test.go +++ b/ipc/pipe/pipe_factory_spawn_test.go @@ -396,16 +396,19 @@ func Test_NumExecs2(t *testing.T) { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) + w.State().Transition(fsm.StateReady) _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) + w.State().Transition(fsm.StateReady) _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(3), w.State().NumExecs()) + w.State().Transition(fsm.StateReady) } diff --git a/ipc/pipe/pipe_factory_test.go b/ipc/pipe/pipe_factory_test.go index f44f1d5..f2a19c0 100755 --- a/ipc/pipe/pipe_factory_test.go +++ b/ipc/pipe/pipe_factory_test.go @@ -421,6 +421,7 @@ func Test_NumExecs(t *testing.T) { if err != nil { t.Errorf("fail to execute payload: error %v", err) } + w.State().Transition(fsm.StateReady) assert.Equal(t, uint64(1), w.State().NumExecs()) _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) @@ -428,10 +429,12 @@ func Test_NumExecs(t *testing.T) { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) + w.State().Transition(fsm.StateReady) _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(3), w.State().NumExecs()) + w.State().Transition(fsm.StateReady) } diff --git a/pool/static_pool/debug.go b/pool/static_pool/debug.go index e2ae6dc..a422e05 100644 --- a/pool/static_pool/debug.go +++ b/pool/static_pool/debug.go @@ -31,7 +31,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s } // create a channel for the stream (only if there are no errors) - resp := make(chan *PExec, 1) + resp := make(chan *PExec, 1000000) switch { case rsp.Flags&frame.STREAM != 0: @@ -54,7 +54,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s }() // send the initial frame - sendResponse(resp, rsp, nil) + resp <- newPExec(rsp, nil) // stream iterator for { @@ -71,13 +71,13 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s cancelT() runtime.Goexit() default: - pld, next, errI := w.StreamIter(ctx) + pld, next, errI := w.StreamIterWithContext(ctx) if errI != nil { - sendResponse(resp, nil, errI) + resp <- newPExec(nil, errI) runtime.Goexit() } - sendResponse(resp, pld, nil) + resp <- newPExec(pld, nil) if !next { // we've got the last frame runtime.Goexit() @@ -88,7 +88,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s return resp, nil default: - sendResponse(resp, rsp, nil) + resp <- newPExec(rsp, nil) // close the channel close(resp) diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index 48acfe1..be74819 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -54,6 +54,7 @@ func Test_SupervisedPool_Exec(t *testing.T) { require.NoError(t, err) } + time.Sleep(time.Second) require.NotEqual(t, pidBefore, p.Workers()[0].Pid()) ctxNew, cancel := context.WithTimeout(ctx, time.Second) @@ -87,6 +88,7 @@ func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) { require.NoError(t, err) } + time.Sleep(time.Second) require.NotEqual(t, pidBefore, p.Workers()[0].Pid()) ctxNew, cancel := context.WithTimeout(ctx, time.Second) diff --git a/pool/static_pool/workers_pool.go b/pool/static_pool/workers_pool.go index 17ff0ea..82ab891 100644 --- a/pool/static_pool/workers_pool.go +++ b/pool/static_pool/workers_pool.go @@ -194,19 +194,14 @@ begin: } if err != nil { - if errors.Is(errors.Retry, err) { - sp.ww.Release(w) - goto begin - } - // just push event if on any stage was timeout error switch { - // for this case, worker already killed in the ExecTTL function case errors.Is(errors.ExecTTL, err): + // for this case, worker already killed in the ExecTTL function sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err)) w.State().Transition(fsm.StateExecTTLReached) - sp.ww.Release(w) + // worker should already be reallocated return nil, err case errors.Is(errors.SoftJob, err): /* @@ -224,19 +219,27 @@ begin: // kill the worker instead of sending a net packet to it _ = w.Kill() + // do not return it, should be reallocated on Kill return nil, err + case errors.Is(errors.Retry, err): + // put the worker back to the stack and retry the request with the new one + sp.ww.Release(w) + goto begin + default: w.State().Transition(fsm.StateErrored) sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err)) + sp.ww.Release(w) return nil, err } } // create channel for the stream (only if there are no errors) - resp := make(chan *PExec, 100) + // we need to create a buffered channel to prevent blocking + resp := make(chan *PExec, 100000000) // send the initial frame - sendResponse(resp, rsp, nil) + resp <- newPExec(rsp, nil) switch { case rsp.Flags&frame.STREAM != 0: @@ -274,35 +277,59 @@ begin: switch sp.supervisedExec { case true: ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL) - pld, next, errI := w.StreamIter(ctxT) + pld, next, errI := w.StreamIterWithContext(ctxT) cancelT() if errI != nil { - sp.log.Warn("stream iter error", zap.Error(err)) + sp.log.Warn("stream error", zap.Error(err)) // send error response - sendResponse(resp, nil, errI) + select { + case resp <- newPExec(nil, errI): + default: + sp.log.Error("failed to send error", zap.Error(errI)) + } // move worker to the invalid state to restart w.State().Transition(fsm.StateInvalid) runtime.Goexit() } - sendResponse(resp, pld, nil) + select { + case resp <- newPExec(pld, nil): + default: + sp.log.Error("failed to send payload chunk, stream is corrupted") + // we need to restart the worker since it can be in the incorrect state + w.State().Transition(fsm.StateErrored) + } + if !next { w.State().Transition(fsm.StateReady) // we've got the last frame runtime.Goexit() } case false: - pld, next, errI := w.StreamIter(context.Background()) + // non supervised execution, can potentially hang here + pld, next, errI := w.StreamIter() if errI != nil { sp.log.Warn("stream iter error", zap.Error(err)) // send error response - sendResponse(resp, nil, errI) + select { + case resp <- newPExec(nil, errI): + default: + sp.log.Error("failed to send error", zap.Error(errI)) + } + // move worker to the invalid state to restart w.State().Transition(fsm.StateInvalid) runtime.Goexit() } - sendResponse(resp, pld, nil) + select { + case resp <- newPExec(pld, nil): + default: + sp.log.Error("failed to send payload chunk, stream is corrupted") + // we need to restart the worker since it can be in the incorrect state + w.State().Transition(fsm.StateErrored) + } + if !next { w.State().Transition(fsm.StateReady) // we've got the last frame @@ -316,6 +343,9 @@ begin: return resp, nil default: sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid())) + if w.State().Compare(fsm.StateWorking) { + w.State().Transition(fsm.StateReady) + } // return worker back sp.ww.Release(w) // close the channel @@ -324,16 +354,6 @@ begin: } } -// this is a wrapper to send response safely -func sendResponse(resp chan *PExec, pld *payload.Payload, err error) { - select { - case resp <- newPExec(pld, err): - default: - panic("can't send response to the channel") - // break - } -} - func (sp *Pool) QueueSize() uint64 { return atomic.LoadUint64(&sp.queue) } diff --git a/worker/worker.go b/worker/worker.go index d283bc1..8f398f5 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -57,6 +57,12 @@ type Process struct { relay relay.Relay } +// internal struct to pass data between goroutines +type wexec struct { + payload *payload.Payload + err error +} + // InitBaseWorker creates new Process over given exec.cmd. func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { if cmd.Process != nil { @@ -226,8 +232,25 @@ func (w *Process) Wait() error { return err } +func (w *Process) StreamIter() (*payload.Payload, bool, error) { + pld, err := w.receiveFrame() + if err != nil { + return nil, false, err + } + + // PING, we should respond with PONG + if pld.Flags&frame.PING != 0 { + if err := w.sendPONG(); err != nil { + return nil, false, err + } + } + + // !=0 -> we have stream bit set, so stream is available + return pld, pld.Flags&frame.STREAM != 0, nil +} + // StreamIter returns true if stream is available and payload -func (w *Process) StreamIter(ctx context.Context) (*payload.Payload, bool, error) { +func (w *Process) StreamIterWithContext(ctx context.Context) (*payload.Payload, bool, error) { c := w.getCh() go func() { @@ -279,26 +302,6 @@ func (w *Process) StreamIter(ctx context.Context) (*payload.Payload, bool, error } } -func (w *Process) sendPONG() error { - // get a frame - fr := w.getFrame() - fr.WriteVersion(fr.Header(), frame.Version1) - - fr.SetPongBit(fr.Header()) - fr.WriteCRC(fr.Header()) - - err := w.Relay().Send(fr) - w.State().RegisterExec() - if err != nil { - w.putFrame(fr) - w.State().Transition(fsm.StateErrored) - return errors.E(errors.Network, err) - } - - w.putFrame(fr) - return nil -} - // StreamCancel sends stop bit to the worker func (w *Process) StreamCancel(ctx context.Context) error { const op = errors.Op("sync_worker_send_frame") @@ -372,11 +375,6 @@ func (w *Process) StreamCancel(ctx context.Context) error { } } -type wexec struct { - payload *payload.Payload - err error -} - // Exec executes payload with TTL timeout in the context. func (w *Process) Exec(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("worker_exec_with_timeout") @@ -400,8 +398,8 @@ func (w *Process) Exec(ctx context.Context, p *payload.Payload) (*payload.Payloa runtime.Goexit() } - rsp, err := w.receiveFrame() w.State().RegisterExec() + rsp, err := w.receiveFrame() if err != nil { c <- &wexec{ payload: rsp, @@ -611,6 +609,26 @@ func (w *Process) receiveFrame() (*payload.Payload, error) { return pld, nil } +func (w *Process) sendPONG() error { + // get a frame + fr := w.getFrame() + fr.WriteVersion(fr.Header(), frame.Version1) + + fr.SetPongBit(fr.Header()) + fr.WriteCRC(fr.Header()) + + err := w.Relay().Send(fr) + w.State().RegisterExec() + if err != nil { + w.putFrame(fr) + w.State().Transition(fsm.StateErrored) + return errors.E(errors.Network, err) + } + + w.putFrame(fr) + return nil +} + func (w *Process) closeRelay() error { if w.relay != nil { err := w.relay.Close()