Skip to content

Commit

Permalink
more logs
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Dec 17, 2022
1 parent 4d62c94 commit bd7daf3
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 18 deletions.
9 changes: 5 additions & 4 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
func() (*commonpb.Payloads, error) {
return msg.Payloads, nil
},
wp.createContinuableCallback(msg.ID),
wp.createContinuableCallback(msg.ID, "SideEffect"),
)

case *internal.CompleteWorkflow:
Expand Down Expand Up @@ -303,22 +303,23 @@ func (wp *Workflow) createCallback(id uint64, t string) bindings.ResultHandler {
return func(result *commonpb.Payloads, err error) {
// timer cancel callback can happen inside the loop
if atomic.LoadUint32(&wp.inLoop) == 1 {
wp.log.Debug("calling callback IN LOOP", zap.Uint64("ID", id))
wp.log.Debug("calling callback IN LOOP", zap.Uint64("ID", id), zap.String("type", t))
callback(result, err)
return
}

wp.callbacks = append(wp.callbacks, func() error {
wp.log.Debug("appending callback", zap.Uint64("ID", id))
wp.log.Debug("appending callback", zap.Uint64("ID", id), zap.String("type", t))
callback(result, err)
return nil
})
}
}

// callback to be called inside the queue processing, adds new messages at the end of the queue
func (wp *Workflow) createContinuableCallback(id uint64) bindings.ResultHandler {
func (wp *Workflow) createContinuableCallback(id uint64, t string) bindings.ResultHandler {
callback := func(result *commonpb.Payloads, err error) {
wp.log.Debug("executing continuable callback", zap.Uint64("ID", id), zap.String("type", t))
wp.canceller.Discard(id)

if err != nil {
Expand Down
14 changes: 9 additions & 5 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ import (

type Callback func() error

// seqID is global sequence ID
var seqID uint64 //nolint:gochecknoglobals

func seq() uint64 {
return atomic.AddUint64(&seqID, 1)
}

type Workflow struct {
codec common.Codec
pool common.Pool
Expand All @@ -38,7 +45,6 @@ type Workflow struct {
mq *queue.MessageQueue
ids *registry.IDRegistry
seqID uint64
sID func() uint64
runID string
pipeline []*internal.Message
callbacks []Callback
Expand All @@ -49,10 +55,9 @@ type Workflow struct {
mh temporalClient.MetricsHandler
}

func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger, seqID func() uint64) *Workflow {
func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger) *Workflow {
return &Workflow{
log: log,
sID: seqID,
codec: codec,
pool: pool,
}
Expand All @@ -65,7 +70,6 @@ func (wp *Workflow) NewWorkflowDefinition() bindings.WorkflowDefinition {
pool: wp.pool,
codec: wp.codec,
log: wp.log,
sID: wp.sID,
}
}

Expand All @@ -81,7 +85,7 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
wp.canceller = new(canceller.Canceller)

// sequenceID shared for all pool workflows
wp.mq = queue.NewMessageQueue(wp.sID)
wp.mq = queue.NewMessageQueue(seq)
wp.ids = new(registry.IDRegistry)

env.RegisterCancelHandler(wp.handleCancel)
Expand Down
10 changes: 1 addition & 9 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

prom "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -85,7 +84,6 @@ type Plugin struct {
events chan events.Event
stopCh chan struct{}

seqID uint64
workers []worker.Worker
}

Expand Down Expand Up @@ -454,12 +452,6 @@ func (p *Plugin) RPC() any {
return &rpc{srv: p, client: p.client}
}

func (p *Plugin) SedID() uint64 {
p.log.Debug("sequenceID", zap.Uint64("before", atomic.LoadUint64(&p.seqID)))
defer p.log.Debug("sequenceID", zap.Uint64("after", atomic.LoadUint64(&p.seqID)+1))
return atomic.AddUint64(&p.seqID, 1)
}

func (p *Plugin) MetricsCollector() []prom.Collector {
// p - implements Exporter interface (workers)
// other - request duration and count
Expand Down Expand Up @@ -507,7 +499,7 @@ func (p *Plugin) initPool() error {
return err
}

p.rrWorkflowDef = aggregatedpool.NewWorkflowDefinition(p.codec, wp, p.log, p.SedID)
p.rrWorkflowDef = aggregatedpool.NewWorkflowDefinition(p.codec, wp, p.log)

// get worker information
wi, err := WorkerInfo(p.codec, wp, p.rrVersion)
Expand Down

0 comments on commit bd7daf3

Please sign in to comment.