Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
fix: initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Jan 4, 2024
1 parent 5417ebb commit 34adf20
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 101 deletions.
Binary file removed .DS_Store
Binary file not shown.
10 changes: 5 additions & 5 deletions pool/static_pool/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
}()

// send the initial frame
resp <- newPExec(rsp, nil)
sendResponse(resp, rsp, nil)

// stream iterator
for {
Expand All @@ -71,13 +71,13 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
pld, next, errI := w.StreamIter(ctx)
if errI != nil {
resp <- newPExec(nil, errI) // exit from the goroutine
sendResponse(resp, nil, errI)
runtime.Goexit()
}

resp <- newPExec(pld, nil)
sendResponse(resp, pld, nil)
if !next {
// we've got the last frame
runtime.Goexit()
Expand All @@ -88,7 +88,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s

return resp, nil
default:
resp <- newPExec(rsp, nil)
sendResponse(resp, rsp, nil)
// close the channel
close(resp)

Expand Down
98 changes: 69 additions & 29 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,15 @@ begin:
rsp, err = w.Exec(ctxT, p)
case false:
// no context here
// potential problem: if the worker is hung, we can't stop it
rsp, err = w.Exec(context.Background(), p)
}

if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
sp.log.Debug("max requests reached", zap.Int64("pid", w.Pid()))
w.State().Transition(fsm.StateMaxJobsReached)
}

if err != nil {
if errors.Is(errors.Retry, err) {
sp.ww.Release(w)
Expand All @@ -203,12 +209,11 @@ begin:

return nil, err
case errors.Is(errors.SoftJob, err):
sp.log.Warn("soft worker error, worker won't be restarted", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err))
// soft jobs errors are allowed, just put the worker back
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
// mark old as invalid and stop
w.State().Transition(fsm.StateMaxJobsReached)
}
/*
in case of soft job error, we should not kill the worker, this is just an error payload from the worker.
*/
w.State().Transition(fsm.StateReady)
sp.log.Warn("soft worker error", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err))
sp.ww.Release(w)

return nil, err
Expand All @@ -228,14 +233,10 @@ begin:
}
}

if sp.cfg.MaxJobs != 0 {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Transition(fsm.StateMaxJobsReached)
}
}

// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)
resp := make(chan *PExec, 100)
// send the initial frame
sendResponse(resp, rsp, nil)

switch {
case rsp.Flags&frame.STREAM != 0:
Expand All @@ -244,38 +245,69 @@ begin:
go func() {
// would be called on Goexit
defer func() {
sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
close(resp)
sp.ww.Release(w)
}()

// send the initial frame
resp <- newPExec(rsp, nil)

// stream iterator
for {
select {
// we received stop signal
case <-stopCh:
sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String()))
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)
cancelT()
if err != nil {
w.State().Transition(fsm.StateErrored)
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)
} else {
// successfully canceled
w.State().Transition(fsm.StateReady)
sp.log.Debug("transition to the ready state", zap.String("from", w.State().String()))
}

cancelT()
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
if errI != nil {
resp <- newPExec(nil, errI) // exit from the goroutine
runtime.Goexit()
}

resp <- newPExec(pld, nil)
if !next {
// we've got the last frame
runtime.Goexit()
// we have to set a stream timeout on every request
switch sp.supervisedExec {
case true:
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL)
pld, next, errI := w.StreamIter(ctxT)
cancelT()
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
sendResponse(resp, nil, errI)
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

sendResponse(resp, pld, nil)
if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
case false:
pld, next, errI := w.StreamIter(context.Background())
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
sendResponse(resp, nil, errI)
// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

sendResponse(resp, pld, nil)
if !next {
w.State().Transition(fsm.StateReady)
// we've got the last frame
runtime.Goexit()
}
}
}
}
Expand All @@ -284,7 +316,6 @@ begin:
return resp, nil
default:
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
resp <- newPExec(rsp, nil)
// return worker back
sp.ww.Release(w)
// close the channel
Expand All @@ -293,6 +324,15 @@ begin:
}
}

// this is a wrapper to send response safely
func sendResponse(resp chan *PExec, pld *payload.Payload, err error) {
select {
case resp <- newPExec(pld, err):
default:
break
}
}

func (sp *Pool) QueueSize() uint64 {
return atomic.LoadUint64(&sp.queue)
}
Expand Down
Loading

0 comments on commit 34adf20

Please sign in to comment.