Skip to content

Commit

Permalink
Introduce separate slot supplier for session activities (#1736)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Dec 2, 2024
1 parent 158b823 commit 2c6bc1d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 22 deletions.
11 changes: 7 additions & 4 deletions contrib/resourcetuner/resourcetuner.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ func NewResourceBasedTuner(opts ResourceBasedTunerOptions) (worker.WorkerTuner,
}
nexusSS := &ResourceBasedSlotSupplier{controller: controller,
options: defaultWorkflowResourceBasedSlotSupplierOptions()}
sessSS := &ResourceBasedSlotSupplier{controller: controller,
options: defaultActivityResourceBasedSlotSupplierOptions()}
compositeTuner, err := worker.NewCompositeTuner(worker.CompositeTunerOptions{
WorkflowSlotSupplier: wfSS,
ActivitySlotSupplier: actSS,
LocalActivitySlotSupplier: laSS,
NexusSlotSupplier: nexusSS,
WorkflowSlotSupplier: wfSS,
ActivitySlotSupplier: actSS,
LocalActivitySlotSupplier: laSS,
NexusSlotSupplier: nexusSS,
SessionActivitySlotSupplier: sessSS,
})
if err != nil {
return nil, err
Expand Down
11 changes: 5 additions & 6 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (ww *workflowWorker) Stop() {
ww.worker.Stop()
}

func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker {
func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker {
if params.Identity == "" {
params.Identity = getWorkerIdentity(params.TaskQueue)
}
Expand All @@ -412,15 +412,14 @@ func newSessionWorker(service workflowservice.WorkflowServiceClient, params work
creationTaskqueue := getCreationTaskqueue(params.TaskQueue)
params.UserContext = context.WithValue(params.UserContext, sessionEnvironmentContextKey, sessionEnvironment)
params.TaskQueue = sessionEnvironment.GetResourceSpecificTaskqueue()
activityWorker := newActivityWorker(service, params, overrides, env, nil)
activityWorker := newActivityWorker(service, params,
&workerOverrides{slotSupplier: params.Tuner.GetSessionActivitySlotSupplier()}, env, nil)

params.MaxConcurrentActivityTaskQueuePollers = 1
params.TaskQueue = creationTaskqueue
if overrides == nil {
overrides = &workerOverrides{}
}
// Although we have session token bucket to limit session size across creation
// and recreation, we also limit it here for creation only
overrides := &workerOverrides{}
overrides.slotSupplier, _ = NewFixedSizeSlotSupplier(maxConcurrentSessionExecutionSize)
creationWorker := newActivityWorker(service, params, overrides, env, sessionEnvironment.GetTokenBucket())

Expand Down Expand Up @@ -1758,7 +1757,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke

var sessionWorker *sessionWorker
if options.EnableSessionWorker && !options.LocalActivityWorkerOnly {
sessionWorker = newSessionWorker(client.workflowService, workerParams, nil, registry, options.MaxConcurrentSessionExecutionSize)
sessionWorker = newSessionWorker(client.workflowService, workerParams, registry, options.MaxConcurrentSessionExecutionSize)
registry.RegisterActivityWithOptions(sessionCreationActivity, RegisterActivityOptions{
Name: sessionCreationActivityName,
})
Expand Down
38 changes: 26 additions & 12 deletions internal/tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type WorkerTuner interface {
GetLocalActivitySlotSupplier() SlotSupplier
// GetNexusSlotSupplier returns the SlotSupplier used for nexus tasks.
GetNexusSlotSupplier() SlotSupplier
// GetSessionActivitySlotSupplier returns the SlotSupplier used for activities within sessions.
GetSessionActivitySlotSupplier() SlotSupplier
}

// SlotPermit is a permit to use a slot.
Expand Down Expand Up @@ -150,10 +152,11 @@ type SlotSupplier interface {
//
// WARNING: Custom implementations of SlotSupplier are currently experimental.
type CompositeTuner struct {
workflowSlotSupplier SlotSupplier
activitySlotSupplier SlotSupplier
localActivitySlotSupplier SlotSupplier
nexusSlotSupplier SlotSupplier
workflowSlotSupplier SlotSupplier
activitySlotSupplier SlotSupplier
localActivitySlotSupplier SlotSupplier
nexusSlotSupplier SlotSupplier
sessionActivitySlotSupplier SlotSupplier
}

func (c *CompositeTuner) GetWorkflowTaskSlotSupplier() SlotSupplier {
Expand All @@ -168,6 +171,9 @@ func (c *CompositeTuner) GetLocalActivitySlotSupplier() SlotSupplier {
func (c *CompositeTuner) GetNexusSlotSupplier() SlotSupplier {
return c.nexusSlotSupplier
}
func (c *CompositeTuner) GetSessionActivitySlotSupplier() SlotSupplier {
return c.sessionActivitySlotSupplier
}

// CompositeTunerOptions are the options used by NewCompositeTuner.
type CompositeTunerOptions struct {
Expand All @@ -179,17 +185,20 @@ type CompositeTunerOptions struct {
LocalActivitySlotSupplier SlotSupplier
// NexusSlotSupplier is the SlotSupplier used for nexus tasks.
NexusSlotSupplier SlotSupplier
// SessionActivitySlotSupplier is the SlotSupplier used for activities within sessions.
SessionActivitySlotSupplier SlotSupplier
}

// NewCompositeTuner creates a WorkerTuner that uses a combination of slot suppliers.
//
// WARNING: Custom implementations of SlotSupplier are currently experimental.
func NewCompositeTuner(options CompositeTunerOptions) (WorkerTuner, error) {
return &CompositeTuner{
workflowSlotSupplier: options.WorkflowSlotSupplier,
activitySlotSupplier: options.ActivitySlotSupplier,
localActivitySlotSupplier: options.LocalActivitySlotSupplier,
nexusSlotSupplier: options.NexusSlotSupplier,
workflowSlotSupplier: options.WorkflowSlotSupplier,
activitySlotSupplier: options.ActivitySlotSupplier,
localActivitySlotSupplier: options.LocalActivitySlotSupplier,
nexusSlotSupplier: options.NexusSlotSupplier,
sessionActivitySlotSupplier: options.SessionActivitySlotSupplier,
}, nil
}

Expand Down Expand Up @@ -235,11 +244,16 @@ func NewFixedSizeTuner(options FixedSizeTunerOptions) (WorkerTuner, error) {
if err != nil {
return nil, err
}
sessSS, err := NewFixedSizeSlotSupplier(options.NumActivitySlots)
if err != nil {
return nil, err
}
return &CompositeTuner{
workflowSlotSupplier: wfSS,
activitySlotSupplier: actSS,
localActivitySlotSupplier: laSS,
nexusSlotSupplier: nexusSS,
workflowSlotSupplier: wfSS,
activitySlotSupplier: actSS,
localActivitySlotSupplier: laSS,
nexusSlotSupplier: nexusSS,
sessionActivitySlotSupplier: sessSS,
}, nil
}

Expand Down
26 changes: 26 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (ts *IntegrationTestSuite) SetupTest() {
ts.NoError(err)
options.Tuner = tuner
}
if strings.Contains(ts.T().Name(), "SlotSuppliersWithSession") {
options.MaxConcurrentActivityExecutionSize = 1
// Apparently this is on by default in these tests anyway, but to be explicit
options.EnableSessionWorker = true
}

ts.worker = worker.New(ts.client, ts.taskQueueName, options)
ts.workerStopped = false
Expand Down Expand Up @@ -3269,6 +3274,27 @@ func (ts *IntegrationTestSuite) TestResourceBasedSlotSupplierManyActs() {
ts.assertMetricGaugeEventually(metrics.WorkerTaskSlotsUsed, wfWorkertags, 0)
}

func (ts *IntegrationTestSuite) TestSlotSuppliersWithSessionAndOneConcurrentMax() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

// Activities time out without the fix, since obtaining a slot takes too long
wfRuns := make([]client.WorkflowRun, 0)
for i := 0; i < 3; i++ {
opts := ts.startWorkflowOptions("slot-suppliers-with-session" + strconv.Itoa(i))
opts.WorkflowExecutionTimeout = 1 * time.Minute
run, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.Echo, "hi")
ts.NoError(err)
ts.NotNil(run)
ts.NoError(err)
wfRuns = append(wfRuns, run)
}

for _, run := range wfRuns {
ts.NoError(run.Get(ctx, nil))
}
}

func (ts *IntegrationTestSuite) TestTooFewParams() {
var res ParamsValue
// Only give first param
Expand Down

0 comments on commit 2c6bc1d

Please sign in to comment.