Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

feature: stream timeouts #92

Merged
merged 3 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions .vscode/settings.json

This file was deleted.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ toolchain go1.21.0

require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/google/uuid v1.3.1
github.com/prometheus/client_golang v1.16.0
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/goridge/v3 v3.7.0
github.com/roadrunner-server/goridge/v3 v3.8.0
github.com/roadrunner-server/tcplisten v1.4.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.4
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -38,8 +38,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo=
github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg=
github.com/roadrunner-server/goridge/v3 v3.7.0 h1:+Z8pezA4vvZ+/LpF7tjmwOYHa9jKrjbGtBn7RpRAswI=
github.com/roadrunner-server/goridge/v3 v3.7.0/go.mod h1:xgheswRjWvQBHRf3AEkFgLnYOSzYg13ZH0OCuDIcJpg=
github.com/roadrunner-server/goridge/v3 v3.8.0 h1:V4EmDs6KfvV+F9ilh4LhmqZy76JGozdDH/S/1v2G2AA=
github.com/roadrunner-server/goridge/v3 v3.8.0/go.mod h1:L5UkNzD8aKLz6TzpqmmiHOJ6EnsadsWEYNoqK/4qoK0=
github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M=
github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
Expand Down
14 changes: 9 additions & 5 deletions payload/payload.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package payload

import (
"github.com/roadrunner-server/sdk/v4/utils"
"unsafe"
)

// Payload carries binary header and body to stack and
Expand All @@ -11,13 +11,17 @@
Context []byte
// body contains binary payload to be processed by WorkerProcess.
Body []byte
// Type of codec used to decode/encode payload
// Type of codec used to decode/encode payload.
Codec byte
// IsStream indicates that payload is stream
IsStream bool
// Flags
Flags byte
}

// String returns payload body as string
func (p *Payload) String() string {
return utils.AsString(p.Body)
if len(p.Body) == 0 {
return ""
}

Check warning on line 24 in payload/payload.go

View check run for this annotation

Codecov / codecov/patch

payload/payload.go#L23-L24

Added lines #L23 - L24 were not covered by tests

return unsafe.String(unsafe.SliceData(p.Body), len(p.Body))
}
14 changes: 6 additions & 8 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,27 @@ import (
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool

// Command used to override the server command with the custom one
Command []string `mapstructure:"command"`

// AfterInitCommand command used to override the server's AfterInitCommand
AfterInitCommand []string `mapstructure:"after_init"`

// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers uint64 `mapstructure:"num_workers"`

// MaxJobs defines how many executions is allowed for the worker until
// its destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
MaxJobs uint64 `mapstructure:"max_jobs"`

// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`

// DestroyTimeout defines for how long pool should be waiting for worker to
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`

// ResetTimeout defines how long pool should wait before start killing workers
ResetTimeout time.Duration `mapstructure:"reset_timeout"`

// Stream read operation timeout
StreamTimeout time.Duration `mapstructure:"stream_timeout"`
// Supervision config to limit worker and pool memory usage.
Supervisor *SupervisorConfig `mapstructure:"supervisor"`
}
Expand All @@ -50,6 +44,10 @@ func (cfg *Config) InitDefaults() {
cfg.AllocateTimeout = time.Minute
}

if cfg.StreamTimeout == 0 {
cfg.StreamTimeout = time.Minute
}

if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
Expand Down
11 changes: 5 additions & 6 deletions pool/static_pool/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"runtime"

"github.com/roadrunner-server/goridge/v3/pkg/frame"
"github.com/roadrunner-server/sdk/v4/events"
"github.com/roadrunner-server/sdk/v4/payload"
"go.uber.org/zap"
Expand All @@ -30,8 +31,8 @@
// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)

switch rsp.IsStream {
case true:
switch {
case rsp.Flags&frame.STREAM != 0:

Check warning on line 35 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L35

Added line #L35 was not covered by tests
// in case of stream we should not return worker back immediately
go func() {
// would be called on Goexit
Expand All @@ -55,7 +56,7 @@
select {
// we received stop signal
case <-stopCh:
err = w.StreamCancel()
err = w.StreamCancel(ctx)

Check warning on line 59 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L59

Added line #L59 was not covered by tests
if err != nil {
sp.log.Warn("stream cancel error", zap.Error(err))
}
Expand All @@ -76,14 +77,12 @@
}()

return resp, nil
case false:
default:
resp <- newPExec(rsp, nil)
// return worker back
sp.ww.Release(w)
// close the channel
close(resp)
return resp, nil
default:
panic("workers_pool unreachable!")
}
}
15 changes: 9 additions & 6 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"sync/atomic"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/goridge/v3/pkg/frame"
"github.com/roadrunner-server/sdk/v4/events"
"github.com/roadrunner-server/sdk/v4/fsm"
"github.com/roadrunner-server/sdk/v4/payload"
Expand Down Expand Up @@ -219,8 +220,8 @@
// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)

switch rsp.IsStream {
case true:
switch {
case rsp.Flags&frame.STREAM != 0:

Check warning on line 224 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L224

Added line #L224 was not covered by tests
sp.log.Debug("stream mode", zap.Int64("pid", w.Pid()))
// in case of stream we should not return worker back immediately
go func() {
Expand All @@ -238,10 +239,14 @@
select {
// we received stop signal
case <-stopCh:
err = w.StreamCancel()
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)

Check warning on line 243 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L242-L243

Added lines #L242 - L243 were not covered by tests
if err != nil {
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)

Check warning on line 246 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L246

Added line #L246 was not covered by tests
}

cancelT()

Check warning on line 249 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L249

Added line #L249 was not covered by tests
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
Expand All @@ -260,16 +265,14 @@
}()

return resp, nil
case false:
default:
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
resp <- newPExec(rsp, nil)
// return worker back
sp.ww.Release(w)
// close the channel
close(resp)
return resp, nil
default:
panic("workers_pool unreachable!")
}
}

Expand Down
4 changes: 2 additions & 2 deletions pool/static_pool/workers_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"sync"
"testing"
"time"
"unsafe"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/fsm"
"github.com/roadrunner-server/sdk/v4/ipc/pipe"
"github.com/roadrunner-server/sdk/v4/payload"
"github.com/roadrunner-server/sdk/v4/pool"
"github.com/roadrunner-server/sdk/v4/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -950,7 +950,7 @@ func BenchmarkToStringUnsafe(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
res := utils.AsString(testPayload)
res := unsafe.String(unsafe.SliceData(testPayload), len(testPayload))
_ = res
}
}
Expand Down
38 changes: 0 additions & 38 deletions utils/convert.go

This file was deleted.

65 changes: 55 additions & 10 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,35 @@
return nil, false, err
}

return pld, pld.IsStream, nil
// PING, we should respond with PONG
if pld.Flags&frame.PING != 0 {
// get a frame
fr := w.getFrame()

fr.WriteVersion(fr.Header(), frame.Version1)

fr.SetPongBit(fr.Header())
fr.WriteCRC(fr.Header())

err := w.Relay().Send(fr)
w.State().RegisterExec()
if err != nil {
w.putFrame(fr)
w.State().Transition(fsm.StateErrored)
return nil, false, errors.E(errors.Network, err)
}

Check warning on line 240 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L225-L240

Added lines #L225 - L240 were not covered by tests

w.putFrame(fr)

Check warning on line 242 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L242

Added line #L242 was not covered by tests
}

// !=0 -> we have stream bit set, so stream is available
return pld, pld.Flags&frame.STREAM != 0, nil

Check warning on line 246 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L246

Added line #L246 was not covered by tests
}

// StreamCancel sends stop bit to the worker
func (w *Process) StreamCancel() error {
func (w *Process) StreamCancel(ctx context.Context) error {

Check warning on line 250 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L250

Added line #L250 was not covered by tests
const op = errors.Op("sync_worker_send_frame")

Check warning on line 252 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L252

Added line #L252 was not covered by tests
// get a frame
fr := w.getFrame()

Expand All @@ -244,7 +267,27 @@
}

w.putFrame(fr)
return nil

for {
select {
case <-ctx.Done():
return errors.E(op, errors.TimeOut, ctx.Err())
default:
rsp, err := w.receiveFrame()
if err != nil {
return err
}

Check warning on line 279 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L270-L279

Added lines #L270 - L279 were not covered by tests

// stream has ended
if rsp.Flags&frame.STREAM == 0 {
w.State().Transition(fsm.StateReady)
return nil
}

Check warning on line 285 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L282-L285

Added lines #L282 - L285 were not covered by tests

// trash
rsp = nil

Check warning on line 288 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L288

Added line #L288 was not covered by tests
}
}
}

type wexec struct {
Expand Down Expand Up @@ -470,9 +513,9 @@
return nil, errors.E(op, errors.Network, errors.Str("nil frame received"))
}

flags := frameR.ReadFlags()
codec := frameR.ReadFlags()

if flags&frame.ERROR != byte(0) {
if codec&frame.ERROR != byte(0) {
// we need to copy the payload because we will put the frame back to the pool
cp := make([]byte, len(frameR.Payload()))
copy(cp, frameR.Payload())
Expand All @@ -497,12 +540,14 @@
return nil, errors.E(errors.Network, errors.Errorf("bad payload %s", cp))
}

isStream := frameR.IsStream(frameR.Header())
// stream + stop -> waste
// stream + ping -> response
flags := frameR.Header()[10]
pld := &payload.Payload{
IsStream: isStream,
Codec: flags,
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
Flags: flags,
Codec: codec,
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
}

// by copying we free frame's payload slice
Expand Down
Loading