Skip to content
This repository was archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
fix: buffered channel consumed too much memory
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Jan 10, 2024
1 parent 15ebfc2 commit f6dea40
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/roadrunner-server/sdk/v4

go 1.21

toolchain go1.22rc1
toolchain go1.21.5

require (
github.com/goccy/go-json v0.10.2
Expand Down
47 changes: 16 additions & 31 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,16 @@ begin:
}
}

// create channel for the stream (only if there are no errors)
// we need to create a buffered channel to prevent blocking
resp := make(chan *PExec, 100000000)
// send the initial frame
resp <- newPExec(rsp, nil)

switch {
case rsp.Flags&frame.STREAM != 0:
sp.log.Debug("stream mode", zap.Int64("pid", w.Pid()))
// create channel for the stream (only if there are no errors)
// we need to create a buffered channel to prevent blocking
// stream buffer size should be bigger than regular, to have some payloads ready (optimization)
resp := make(chan *PExec, 5)
// send the initial frame
resp <- newPExec(rsp, nil)

// in case of stream we should not return worker back immediately
go func() {
// would be called on Goexit
Expand Down Expand Up @@ -281,24 +282,15 @@ begin:
cancelT()
if errI != nil {
sp.log.Warn("stream error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))
}

resp <- newPExec(nil, errI)

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)
}
resp <- newPExec(pld, nil)

if !next {
w.State().Transition(fsm.StateReady)
Expand All @@ -311,24 +303,14 @@ begin:
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))
}
resp <- newPExec(nil, errI)

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)
}
resp <- newPExec(pld, nil)

if !next {
w.State().Transition(fsm.StateReady)
Expand All @@ -342,6 +324,9 @@ begin:

return resp, nil
default:
resp := make(chan *PExec, 1)
// send the initial frame
resp <- newPExec(rsp, nil)
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
if w.State().Compare(fsm.StateWorking) {
w.State().Transition(fsm.StateReady)
Expand Down

0 comments on commit f6dea40

Please sign in to comment.