Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

fix: streaming responses lock-up under some conditions #105

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .DS_Store
Binary file not shown.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ module github.com/roadrunner-server/sdk/v4

go 1.21

toolchain go1.22rc1
toolchain go1.21.5

require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.5.0
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_golang v1.18.0
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/goridge/v3 v3.8.1
github.com/roadrunner-server/tcplisten v1.4.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/sync v0.5.0
golang.org/x/sync v0.6.0
)

require (
Expand All @@ -31,7 +31,7 @@ require (
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/sys v0.16.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
19 changes: 6 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
Expand All @@ -19,13 +17,12 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
Expand Down Expand Up @@ -56,16 +53,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
3 changes: 3 additions & 0 deletions ipc/pipe/pipe_factory_spawn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,16 +396,19 @@ func Test_NumExecs2(t *testing.T) {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
w.State().Transition(fsm.StateReady)

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
w.State().Transition(fsm.StateReady)

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(3), w.State().NumExecs())
w.State().Transition(fsm.StateReady)
}
3 changes: 3 additions & 0 deletions ipc/pipe/pipe_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,17 +421,20 @@ func Test_NumExecs(t *testing.T) {
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
w.State().Transition(fsm.StateReady)
assert.Equal(t, uint64(1), w.State().NumExecs())

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
w.State().Transition(fsm.StateReady)

_, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(3), w.State().NumExecs())
w.State().Transition(fsm.StateReady)
}
6 changes: 3 additions & 3 deletions pool/static_pool/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
}

// create a channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)
resp := make(chan *PExec, 1000000)
rustatian marked this conversation as resolved.
Show resolved Hide resolved

switch {
case rsp.Flags&frame.STREAM != 0:
Expand Down Expand Up @@ -71,9 +71,9 @@
cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
pld, next, errI := w.StreamIterWithContext(ctx)

Check warning on line 74 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L74

Added line #L74 was not covered by tests
if errI != nil {
resp <- newPExec(nil, errI) // exit from the goroutine
resp <- newPExec(nil, errI)

Check warning on line 76 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L76

Added line #L76 was not covered by tests
runtime.Goexit()
}

Expand Down
2 changes: 2 additions & 0 deletions pool/static_pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func Test_SupervisedPool_Exec(t *testing.T) {
require.NoError(t, err)
}

time.Sleep(time.Second)
require.NotEqual(t, pidBefore, p.Workers()[0].Pid())

ctxNew, cancel := context.WithTimeout(ctx, time.Second)
Expand Down Expand Up @@ -87,6 +88,7 @@ func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) {
require.NoError(t, err)
}

time.Sleep(time.Second)
require.NotEqual(t, pidBefore, p.Workers()[0].Pid())

ctxNew, cancel := context.WithTimeout(ctx, time.Second)
Expand Down
133 changes: 97 additions & 36 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,31 +184,31 @@
rsp, err = w.Exec(ctxT, p)
case false:
// no context here
// potential problem: if the worker is hung, we can't stop it
rsp, err = w.Exec(context.Background(), p)
rustatian marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
if errors.Is(errors.Retry, err) {
sp.ww.Release(w)
goto begin
}
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
sp.log.Debug("max requests reached", zap.Int64("pid", w.Pid()))
w.State().Transition(fsm.StateMaxJobsReached)
}
rustatian marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
// just push event if on any stage was timeout error
switch {
// for this case, worker already killed in the ExecTTL function
case errors.Is(errors.ExecTTL, err):
// for this case, worker already killed in the ExecTTL function
sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err))
w.State().Transition(fsm.StateExecTTLReached)
rustatian marked this conversation as resolved.
Show resolved Hide resolved
sp.ww.Release(w)

// worker should already be reallocated
return nil, err
case errors.Is(errors.SoftJob, err):
sp.log.Warn("soft worker error, worker won't be restarted", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err))
// soft jobs errors are allowed, just put the worker back
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
// mark old as invalid and stop
w.State().Transition(fsm.StateMaxJobsReached)
}
/*
in case of soft job error, we should not kill the worker, this is just an error payload from the worker.
*/
w.State().Transition(fsm.StateReady)
sp.log.Warn("soft worker error", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err))
rustatian marked this conversation as resolved.
Show resolved Hide resolved
sp.ww.Release(w)

return nil, err
Expand All @@ -219,23 +219,27 @@
// kill the worker instead of sending a net packet to it
_ = w.Kill()

// do not return it, should be reallocated on Kill
return nil, err
Comment on lines +222 to 223
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a network error occurs, the worker is killed (line 220), but there is no subsequent error handling or retry logic. This could lead to dropped jobs if not properly managed.

+ // TODO: Implement retry logic or error propagation after a worker is killed due to a network error.

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// do not return it, should be reallocated on Kill
return nil, err
// TODO: Implement retry logic or error propagation after a worker is killed due to a network error.
// do not return it, should be reallocated on Kill
return nil, err

case errors.Is(errors.Retry, err):
// put the worker back to the stack and retry the request with the new one
sp.ww.Release(w)
goto begin

Check warning on line 227 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L224-L227

Added lines #L224 - L227 were not covered by tests

default:
w.State().Transition(fsm.StateErrored)
sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err))

sp.ww.Release(w)

Check warning on line 233 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L233

Added line #L233 was not covered by tests
return nil, err
}
}

if sp.cfg.MaxJobs != 0 {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Transition(fsm.StateMaxJobsReached)
}
}

// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)
// we need to create a buffered channel to prevent blocking
resp := make(chan *PExec, 100000000)
Comment on lines +239 to +240
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a buffered channel with a very large buffer size (line 240) can lead to high memory usage and potential performance issues. Consider using a smaller buffer and proper flow control.

- resp := make(chan *PExec, 100000000)
+ // TODO: Evaluate the buffer size for potential memory usage optimization.

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// we need to create a buffered channel to prevent blocking
resp := make(chan *PExec, 100000000)
// we need to create a buffered channel to prevent blocking
// TODO: Evaluate the buffer size for potential memory usage optimization.

// send the initial frame
resp <- newPExec(rsp, nil)

switch {
case rsp.Flags&frame.STREAM != 0:
Expand All @@ -244,38 +248,93 @@
go func() {
// would be called on Goexit
defer func() {
sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))

Check warning on line 251 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L251

Added line #L251 was not covered by tests
close(resp)
sp.ww.Release(w)
}()

// send the initial frame
resp <- newPExec(rsp, nil)

// stream iterator
for {
select {
// we received stop signal
case <-stopCh:
sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))

Check warning on line 261 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L261

Added line #L261 was not covered by tests
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)
cancelT()

Check warning on line 264 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L264

Added line #L264 was not covered by tests
if err != nil {
w.State().Transition(fsm.StateErrored)

Check warning on line 266 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L266

Added line #L266 was not covered by tests
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)
} else {
// successfully canceled
w.State().Transition(fsm.StateReady)
sp.log.Debug("transition to the ready state", zap.String("from", w.State().String()))

Check warning on line 271 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L268-L271

Added lines #L268 - L271 were not covered by tests
Comment on lines +251 to +271
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goroutine started on line 248 for handling stream mode does not have any recovery logic for panics that might occur within it. This could lead to a situation where a panic in one goroutine could potentially affect the stability of the entire application.

+ // TODO: Add recovery logic to the goroutine to handle potential panics gracefully.

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
close(resp)
sp.ww.Release(w)
}()
// send the initial frame
resp <- newPExec(rsp, nil)
// stream iterator
for {
select {
// we received stop signal
case <-stopCh:
sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)
cancelT()
if err != nil {
w.State().Transition(fsm.StateErrored)
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)
} else {
// successfully canceled
w.State().Transition(fsm.StateReady)
sp.log.Debug("transition to the ready state", zap.String("from", w.State().String()))
// TODO: Add recovery logic to the goroutine to handle potential panics gracefully.
sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
close(resp)
sp.ww.Release(w)
}()
// stream iterator
for {
select {
// we received stop signal
case <-stopCh:
sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)
cancelT()
if err != nil {
w.State().Transition(fsm.StateErrored)
sp.log.Warn("stream cancel error", zap.Error(err))
} else {
// successfully canceled
w.State().Transition(fsm.StateReady)
sp.log.Debug("transition to the ready state", zap.String("from", w.State().String()))

}

cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
if errI != nil {
resp <- newPExec(nil, errI) // exit from the goroutine
runtime.Goexit()
}

resp <- newPExec(pld, nil)
if !next {
// we've got the last frame
runtime.Goexit()
// we have to set a stream timeout on every request
switch sp.supervisedExec {
case true:
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL)
pld, next, errI := w.StreamIterWithContext(ctxT)
cancelT()
if errI != nil {
sp.log.Warn("stream error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))

Check warning on line 288 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L276-L288

Added lines #L276 - L288 were not covered by tests
}
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()

Check warning on line 292 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L291-L292

Added lines #L291 - L292 were not covered by tests
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)

Check warning on line 300 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L295-L300

Added lines #L295 - L300 were not covered by tests
}

if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
case false:
// non supervised execution, can potentially hang here
pld, next, errI := w.StreamIter()
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))

Check warning on line 317 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L303-L317

Added lines #L303 - L317 were not covered by tests
}

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()

Check warning on line 322 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L321-L322

Added lines #L321 - L322 were not covered by tests
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)

Check warning on line 330 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L325-L330

Added lines #L325 - L330 were not covered by tests
}

if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}

Check warning on line 337 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L333-L337

Added lines #L333 - L337 were not covered by tests
}
}
}
Expand All @@ -284,7 +343,9 @@
return resp, nil
default:
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
resp <- newPExec(rsp, nil)
if w.State().Compare(fsm.StateWorking) {
w.State().Transition(fsm.StateReady)
Comment on lines +346 to +347
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state transition to StateReady (line 347) should be verified to ensure that the worker is indeed ready and that there are no pending tasks that could be lost.

+ // TODO: Verify that no tasks are lost during the transition to StateReady.

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if w.State().Compare(fsm.StateWorking) {
w.State().Transition(fsm.StateReady)
if w.State().Compare(fsm.StateWorking) {
// TODO: Verify that no tasks are lost during the transition to StateReady.
w.State().Transition(fsm.StateReady)

}
// return worker back
sp.ww.Release(w)
// close the channel
Expand Down
Loading
Loading