Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: workflow refactoring #307

Merged
merged 4 commits into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions aggregatedpool/fns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package aggregatedpool

import (
"context"
"sync"
"sync/atomic"

"github.com/google/uuid"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v3/payload"
"github.com/temporalio/roadrunner-temporal/v2/common"
"github.com/temporalio/roadrunner-temporal/v2/internal"
commonpb "go.temporal.io/api/common/v1"
tActivity "go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
)

type LocalActivityFn struct {
header *commonpb.Header
codec common.Codec
pool common.Pool
seqID uint64
sID func() uint64
}

func NewLocalActivityFn(header *commonpb.Header, codec common.Codec, pool common.Pool, seqID func() uint64) *LocalActivityFn {
return &LocalActivityFn{
header: header,
codec: codec,
pool: pool,
sID: seqID,
}
}

func (la *LocalActivityFn) execute(ctx context.Context, args *commonpb.Payloads) (*commonpb.Payloads, error) {
const op = errors.Op("activity_pool_execute_activity")

var info = tActivity.GetInfo(ctx)
info.TaskToken = []byte(uuid.NewString())
mh := tActivity.GetMetricsHandler(ctx)
// if the mh is not nil, record the RR metric
if mh != nil {
mh.Gauge(RrMetricName).Update(float64(la.pool.QueueSize()))
defer mh.Gauge(RrMetricName).Update(float64(la.pool.QueueSize()))
}

var msg = &internal.Message{
ID: atomic.AddUint64(&la.seqID, 1),
Command: internal.InvokeLocalActivity{
Name: info.ActivityType.Name,
Info: info,
},
Payloads: args,
Header: la.header,
}

pld := getPld()
defer putPld(pld)

err := la.codec.Encode(&internal.Context{TaskQueue: info.TaskQueue}, pld, msg)
if err != nil {
return nil, err
}

result, err := la.pool.Exec(ctx, pld)
if err != nil {
return nil, errors.E(op, err)
}

out := make([]*internal.Message, 0, 2)
err = la.codec.Decode(result, &out)
if err != nil {
return nil, err
}

if len(out) != 1 {
return nil, errors.E(op, errors.Str("invalid local activity worker response"))
}

retPld := out[0]
if retPld.Failure != nil {
if retPld.Failure.Message == doNotCompleteOnReturn {
return nil, tActivity.ErrResultPending
}

return nil, temporal.GetDefaultFailureConverter().FailureToError(retPld.Failure)
}

return retPld.Payloads, nil
}

var pldP = sync.Pool{ //nolint:gochecknoglobals
New: func() any {
return &payload.Payload{}
},
}

func getPld() *payload.Payload {
return pldP.Get().(*payload.Payload)
}

func putPld(pld *payload.Payload) {
pld.Codec = 0
pld.Context = nil
pld.Body = nil
pldP.Put(pld)
}
31 changes: 22 additions & 9 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,22 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {

switch command := msg.Command.(type) {
case *internal.ExecuteActivity:
wp.log.Debug("executing activity", zap.Uint64("ID", msg.ID))
params := command.ActivityParams(wp.env, msg.Payloads, msg.Header)
activityID := wp.env.ExecuteActivity(params, wp.createCallback(msg.ID))
activityID := wp.env.ExecuteActivity(params, wp.createCallback(msg.ID, "activity"))

wp.canceller.Register(msg.ID, func() error {
wp.log.Debug("registering activity canceller", zap.String("activityID", activityID.String()))
wp.env.RequestCancelActivity(activityID)
return nil
})

case *internal.ExecuteLocalActivity:
params := command.LocalActivityParams(wp.env, wp.execute, msg.Payloads, msg.Header)
wp.log.Debug("executing local activity", zap.Uint64("ID", msg.ID))
params := command.LocalActivityParams(wp.env, NewLocalActivityFn(msg.Header, wp.codec, wp.pool, wp.sID).execute, msg.Payloads, msg.Header)
activityID := wp.env.ExecuteLocalActivity(params, wp.createLocalActivityCallback(msg.ID))
wp.canceller.Register(msg.ID, func() error {
wp.log.Debug("registering local activity canceller", zap.String("activityID", activityID.String()))
wp.env.RequestCancelLocalActivity(activityID)
return nil
})
Expand All @@ -102,7 +106,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
params.WorkflowID = wp.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID))
}

wp.env.ExecuteChildWorkflow(params, wp.createCallback(msg.ID), func(r bindings.WorkflowExecution, e error) {
wp.env.ExecuteChildWorkflow(params, wp.createCallback(msg.ID, "ExecuteChildWorkflow"), func(r bindings.WorkflowExecution, e error) {
wp.ids.Push(msg.ID, r, e)
})

Expand All @@ -113,7 +117,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {

case *internal.GetChildWorkflowExecution:
wp.ids.Listen(command.ID, func(w bindings.WorkflowExecution, err error) {
cl := wp.createCallback(msg.ID)
cl := wp.createCallback(msg.ID, "GetChildWorkflow")

if err != nil {
cl(nil, err)
Expand All @@ -129,7 +133,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
})

case *internal.NewTimer:
timerID := wp.env.NewTimer(command.ToDuration(), wp.createCallback(msg.ID))
timerID := wp.env.NewTimer(command.ToDuration(), wp.createCallback(msg.ID, "NewTimer"))
wp.canceller.Register(msg.ID, func() error {
if timerID != nil {
wp.env.RequestCancelTimer(*timerID)
Expand Down Expand Up @@ -205,11 +209,11 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
nil,
msg.Header,
command.ChildWorkflowOnly,
wp.createCallback(msg.ID),
wp.createCallback(msg.ID, "SignalExternalWorkflow"),
)

case *internal.CancelExternalWorkflow:
wp.env.RequestCancelExternalWorkflow(command.Namespace, command.WorkflowID, command.RunID, wp.createCallback(msg.ID))
wp.env.RequestCancelExternalWorkflow(command.Namespace, command.WorkflowID, command.RunID, wp.createCallback(msg.ID, "CancelExternalWorkflow"))

case *internal.Cancel:
err := wp.canceller.Cancel(command.CommandIDs...)
Expand Down Expand Up @@ -238,52 +242,61 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {

func (wp *Workflow) createLocalActivityCallback(id uint64) bindings.LocalActivityResultHandler {
callback := func(lar *bindings.LocalActivityResultWrapper) {
wp.log.Debug("executing local activity callback", zap.Uint64("ID", id))
wp.canceller.Discard(id)

if lar.Err != nil {
wp.log.Error("local activity", zap.Error(lar.Err), zap.Int32("attempt", lar.Attempt), zap.Duration("backoff", lar.Backoff))
wp.log.Debug("error", zap.Error(lar.Err), zap.Int32("attempt", lar.Attempt), zap.Duration("backoff", lar.Backoff))
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(lar.Err))
return
}

wp.log.Debug("pushing local activity response", zap.Uint64("ID", id))
wp.mq.PushResponse(id, lar.Result)
}

return func(lar *bindings.LocalActivityResultWrapper) {
// timer cancel callback can happen inside the loop
if atomic.LoadUint32(&wp.inLoop) == 1 {
wp.log.Debug("calling local activity callback IN LOOP", zap.Uint64("ID", id))
callback(lar)
return
}

wp.callbacks = append(wp.callbacks, func() error {
wp.log.Debug("appending local activity callback", zap.Uint64("ID", id))
callback(lar)
return nil
})
}
}

func (wp *Workflow) createCallback(id uint64) bindings.ResultHandler {
func (wp *Workflow) createCallback(id uint64, t string) bindings.ResultHandler {
callback := func(result *commonpb.Payloads, err error) {
wp.log.Debug("executing callback", zap.Uint64("ID", id), zap.String("type", t))
wp.canceller.Discard(id)

if err != nil {
wp.log.Debug("error", zap.Error(err), zap.String("type", t))
wp.mq.PushError(id, temporal.GetDefaultFailureConverter().ErrorToFailure(err))
return
}

wp.log.Debug("pushing response", zap.Uint64("ID", id), zap.String("type", t))
// fetch original payload
wp.mq.PushResponse(id, result)
}

return func(result *commonpb.Payloads, err error) {
// timer cancel callback can happen inside the loop
if atomic.LoadUint32(&wp.inLoop) == 1 {
wp.log.Debug("calling callback IN LOOP", zap.Uint64("ID", id))
callback(result, err)
return
}

wp.callbacks = append(wp.callbacks, func() error {
wp.log.Debug("appending callback", zap.Uint64("ID", id))
callback(result, err)
return nil
})
Expand Down
Loading