Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
[#92]: feature: stream timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Aug 22, 2023
2 parents d4175c2 + caf555a commit 9793e69
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 99 deletions.
18 changes: 0 additions & 18 deletions .vscode/settings.json

This file was deleted.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ toolchain go1.21.0

require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/google/uuid v1.3.1
github.com/prometheus/client_golang v1.16.0
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/goridge/v3 v3.7.0
github.com/roadrunner-server/goridge/v3 v3.8.0
github.com/roadrunner-server/tcplisten v1.4.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.4
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -38,8 +38,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo=
github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg=
github.com/roadrunner-server/goridge/v3 v3.7.0 h1:+Z8pezA4vvZ+/LpF7tjmwOYHa9jKrjbGtBn7RpRAswI=
github.com/roadrunner-server/goridge/v3 v3.7.0/go.mod h1:xgheswRjWvQBHRf3AEkFgLnYOSzYg13ZH0OCuDIcJpg=
github.com/roadrunner-server/goridge/v3 v3.8.0 h1:V4EmDs6KfvV+F9ilh4LhmqZy76JGozdDH/S/1v2G2AA=
github.com/roadrunner-server/goridge/v3 v3.8.0/go.mod h1:L5UkNzD8aKLz6TzpqmmiHOJ6EnsadsWEYNoqK/4qoK0=
github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M=
github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
Expand Down
14 changes: 9 additions & 5 deletions payload/payload.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package payload

import (
"github.com/roadrunner-server/sdk/v4/utils"
"unsafe"
)

// Payload carries binary header and body to stack and
Expand All @@ -11,13 +11,17 @@ type Payload struct {
Context []byte
// body contains binary payload to be processed by WorkerProcess.
Body []byte
// Type of codec used to decode/encode payload
// Type of codec used to decode/encode payload.
Codec byte
// IsStream indicates that payload is stream
IsStream bool
// Flags
Flags byte
}

// String returns payload body as string
func (p *Payload) String() string {
return utils.AsString(p.Body)
if len(p.Body) == 0 {
return ""
}

return unsafe.String(unsafe.SliceData(p.Body), len(p.Body))
}
14 changes: 6 additions & 8 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,27 @@ import (
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool

// Command used to override the server command with the custom one
Command []string `mapstructure:"command"`

// AfterInitCommand command used to override the server's AfterInitCommand
AfterInitCommand []string `mapstructure:"after_init"`

// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers uint64 `mapstructure:"num_workers"`

// MaxJobs defines how many executions is allowed for the worker until
// its destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
MaxJobs uint64 `mapstructure:"max_jobs"`

// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`

// DestroyTimeout defines for how long pool should be waiting for worker to
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`

// ResetTimeout defines how long pool should wait before start killing workers
ResetTimeout time.Duration `mapstructure:"reset_timeout"`

// Stream read operation timeout
StreamTimeout time.Duration `mapstructure:"stream_timeout"`
// Supervision config to limit worker and pool memory usage.
Supervisor *SupervisorConfig `mapstructure:"supervisor"`
}
Expand All @@ -50,6 +44,10 @@ func (cfg *Config) InitDefaults() {
cfg.AllocateTimeout = time.Minute
}

if cfg.StreamTimeout == 0 {
cfg.StreamTimeout = time.Minute
}

if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
Expand Down
11 changes: 5 additions & 6 deletions pool/static_pool/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"runtime"

"github.com/roadrunner-server/goridge/v3/pkg/frame"
"github.com/roadrunner-server/sdk/v4/events"
"github.com/roadrunner-server/sdk/v4/payload"
"go.uber.org/zap"
Expand All @@ -30,8 +31,8 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)

switch rsp.IsStream {
case true:
switch {
case rsp.Flags&frame.STREAM != 0:
// in case of stream we should not return worker back immediately
go func() {
// would be called on Goexit
Expand All @@ -55,7 +56,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
select {
// we received stop signal
case <-stopCh:
err = w.StreamCancel()
err = w.StreamCancel(ctx)
if err != nil {
sp.log.Warn("stream cancel error", zap.Error(err))
}
Expand All @@ -76,14 +77,12 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
}()

return resp, nil
case false:
default:
resp <- newPExec(rsp, nil)
// return worker back
sp.ww.Release(w)
// close the channel
close(resp)
return resp, nil
default:
panic("workers_pool unreachable!")
}
}
15 changes: 9 additions & 6 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/goridge/v3/pkg/frame"
"github.com/roadrunner-server/sdk/v4/events"
"github.com/roadrunner-server/sdk/v4/fsm"
"github.com/roadrunner-server/sdk/v4/payload"
Expand Down Expand Up @@ -219,8 +220,8 @@ begin:
// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)

switch rsp.IsStream {
case true:
switch {
case rsp.Flags&frame.STREAM != 0:
sp.log.Debug("stream mode", zap.Int64("pid", w.Pid()))
// in case of stream we should not return worker back immediately
go func() {
Expand All @@ -238,10 +239,14 @@ begin:
select {
// we received stop signal
case <-stopCh:
err = w.StreamCancel()
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)
if err != nil {
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)
}

cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
Expand All @@ -260,16 +265,14 @@ begin:
}()

return resp, nil
case false:
default:
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
resp <- newPExec(rsp, nil)
// return worker back
sp.ww.Release(w)
// close the channel
close(resp)
return resp, nil
default:
panic("workers_pool unreachable!")
}
}

Expand Down
4 changes: 2 additions & 2 deletions pool/static_pool/workers_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"sync"
"testing"
"time"
"unsafe"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/fsm"
"github.com/roadrunner-server/sdk/v4/ipc/pipe"
"github.com/roadrunner-server/sdk/v4/payload"
"github.com/roadrunner-server/sdk/v4/pool"
"github.com/roadrunner-server/sdk/v4/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -950,7 +950,7 @@ func BenchmarkToStringUnsafe(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
res := utils.AsString(testPayload)
res := unsafe.String(unsafe.SliceData(testPayload), len(testPayload))
_ = res
}
}
Expand Down
38 changes: 0 additions & 38 deletions utils/convert.go

This file was deleted.

65 changes: 55 additions & 10 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,35 @@ func (w *Process) StreamIter() (*payload.Payload, bool, error) {
return nil, false, err
}

return pld, pld.IsStream, nil
// PING, we should respond with PONG
if pld.Flags&frame.PING != 0 {
// get a frame
fr := w.getFrame()

fr.WriteVersion(fr.Header(), frame.Version1)

fr.SetPongBit(fr.Header())
fr.WriteCRC(fr.Header())

err := w.Relay().Send(fr)
w.State().RegisterExec()
if err != nil {
w.putFrame(fr)
w.State().Transition(fsm.StateErrored)
return nil, false, errors.E(errors.Network, err)
}

w.putFrame(fr)
}

// !=0 -> we have stream bit set, so stream is available
return pld, pld.Flags&frame.STREAM != 0, nil
}

// StreamCancel sends stop bit to the worker
func (w *Process) StreamCancel() error {
func (w *Process) StreamCancel(ctx context.Context) error {
const op = errors.Op("sync_worker_send_frame")

// get a frame
fr := w.getFrame()

Expand All @@ -244,7 +267,27 @@ func (w *Process) StreamCancel() error {
}

w.putFrame(fr)
return nil

for {
select {
case <-ctx.Done():
return errors.E(op, errors.TimeOut, ctx.Err())
default:
rsp, err := w.receiveFrame()
if err != nil {
return err
}

// stream has ended
if rsp.Flags&frame.STREAM == 0 {
w.State().Transition(fsm.StateReady)
return nil
}

// trash
rsp = nil
}
}
}

type wexec struct {
Expand Down Expand Up @@ -470,9 +513,9 @@ func (w *Process) receiveFrame() (*payload.Payload, error) {
return nil, errors.E(op, errors.Network, errors.Str("nil frame received"))
}

flags := frameR.ReadFlags()
codec := frameR.ReadFlags()

if flags&frame.ERROR != byte(0) {
if codec&frame.ERROR != byte(0) {
// we need to copy the payload because we will put the frame back to the pool
cp := make([]byte, len(frameR.Payload()))
copy(cp, frameR.Payload())
Expand All @@ -497,12 +540,14 @@ func (w *Process) receiveFrame() (*payload.Payload, error) {
return nil, errors.E(errors.Network, errors.Errorf("bad payload %s", cp))
}

isStream := frameR.IsStream(frameR.Header())
// stream + stop -> waste
// stream + ping -> response
flags := frameR.Header()[10]
pld := &payload.Payload{
IsStream: isStream,
Codec: flags,
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
Flags: flags,
Codec: codec,
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
}

// by copying we free frame's payload slice
Expand Down

0 comments on commit 9793e69

Please sign in to comment.