Skip to content

Commit

Permalink
Add TaskQueueStats to DescribeTaskQueueEnhanced (#1553)
Browse files Browse the repository at this point in the history
* Add TaskQueueStats to DescribeTaskQueueEnhanced

* reformat

* Address comments

* Fix test

* Get rid of time.sleep in the test

* Attempt to fix test in CI

* Fix test failure and add Eager comments

* Convert BacklogIncreaseRate to a field

* improve tests

* improve tests

* skip test in 1.24
  • Loading branch information
ShahabT authored Aug 16, 2024
1 parent bb42a8b commit b5db2b7
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ jobs:
env:
# TODO(bergundy): Remove this flag once server 1.25.0 is out.
DISABLE_NEXUS_TESTS: "1"
# TODO(bergundy): Remove this flag too once server 1.25.0 is out. Thanks Roey! :)
DISABLE_BACKLOG_STATS_TESTS: "1"
working-directory: ./internal/cmd/build

cloud-test:
Expand Down
11 changes: 6 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ type (
TaskQueueReachability = internal.TaskQueueReachability

// DescribeTaskQueueEnhancedOptions is the input to [Client.DescribeTaskQueueEnhanced].
// WARNING: Worker versioning is currently experimental.
DescribeTaskQueueEnhancedOptions = internal.DescribeTaskQueueEnhancedOptions

// TaskQueueVersionSelection is a task queue filter based on versioning.
Expand All @@ -345,24 +344,26 @@ type (
TaskQueueVersionSelection = internal.TaskQueueVersionSelection

// TaskQueueDescription is the response to [Client.DescribeTaskQueueEnhanced].
// WARNING: Worker versioning is currently experimental.
TaskQueueDescription = internal.TaskQueueDescription

// TaskQueueVersionInfo includes task queue information per Build ID.
// It is part of [Client.TaskQueueDescription].
// WARNING: Worker versioning is currently experimental.
TaskQueueVersionInfo = internal.TaskQueueVersionInfo

// TaskQueueTypeInfo specifies task queue information per task type and Build ID.
// It is included in [Client.TaskQueueVersionInfo].
// WARNING: Worker versioning is currently experimental.
TaskQueueTypeInfo = internal.TaskQueueTypeInfo

// TaskQueuePollerInfo provides information about a worker/client polling a task queue.
// It is used by [Client.TaskQueueTypeInfo].
// WARNING: Worker versioning is currently experimental.
TaskQueuePollerInfo = internal.TaskQueuePollerInfo

// TaskQueueStats contains statistics about task queue backlog and activity.
//
// For workflow task queue type, this result is partial because tasks sent to sticky queues are not included. Read
// comments above each metric to understand the impact of sticky queue exclusion on that metric accuracy.
TaskQueueStats = internal.TaskQueueStats

// WorkerVersionCapabilities includes a worker's build identifier
// and whether it is choosing to use the versioning feature.
// It is an optional component of [Client.TaskQueuePollerInfo].
Expand Down
1 change: 1 addition & 0 deletions internal/internal_logging_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
tagWorkflowID = "WorkflowID"
tagWorkflowType = "WorkflowType"
tagWorkerID = "WorkerID"
tagBuildID = "BuildID"
tagWorkerType = "WorkerType"
tagSideEffectID = "SideEffectID"
tagChildWorkflowID = "ChildWorkflowID"
Expand Down
75 changes: 75 additions & 0 deletions internal/internal_versioning_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type (
// Include task reachability for the requested versions and all task types
// (task reachability is not reported per task type).
ReportTaskReachability bool
// Include task queue stats for requested task queue types and versions.
ReportStats bool
}

// WorkerVersionCapabilities includes a worker's build identifier
Expand All @@ -126,11 +128,68 @@ type (
WorkerVersionCapabilities *WorkerVersionCapabilities
}

// TaskQueueStats contains statistics about task queue backlog and activity.
//
// For workflow task queue type, this result is partial because tasks sent to sticky queues are not included. Read
// comments above each metric to understand the impact of sticky queue exclusion on that metric accuracy.
TaskQueueStats struct {
// The approximate number of tasks backlogged in this task queue. May count expired tasks but eventually
// converges to the right value. Can be relied upon for scaling decisions.
//
// Special note for workflow task queue type: this metric does not count sticky queue tasks. However, because
// those tasks only remain valid for a few seconds, the inaccuracy becomes less significant as the backlog size
// grows.
ApproximateBacklogCount int64
// Approximate age of the oldest task in the backlog based on the creation time of the task at the head of
// the queue. Can be relied upon for scaling decisions.
//
// Special note for workflow task queue type: this metric does not count sticky queue tasks. However, because
// those tasks only remain valid for a few seconds, they should not affect the result when backlog is older than
// few seconds.
ApproximateBacklogAge time.Duration
// Approximate *net* tasks per second added to the backlog, averaging the last 30 seconds. This is calculated as
// `TasksAddRate - TasksDispatchRate`.
// A positive value of `X` means the backlog is growing by about `X` tasks per second. A negative `-X` value means the
// backlog is shrinking by about `X` tasks per second.
//
// Special note for workflow task queue type: this metric does not count sticky queue tasks. However, because
// those tasks only remain valid for a few seconds, the inaccuracy becomes less significant as the backlog size
// or age grow.
BacklogIncreaseRate float32
// Approximate tasks per second added to the task queue, averaging the last 30 seconds. This includes both
// backlogged and sync-matched tasks, but excludes the Eagerly dispatched workflow and activity tasks (see
// documentation for `client.StartWorkflowOptions.EnableEagerStart` and `worker.Options.DisableEagerActivities`.)
//
// The difference between `TasksAddRate` and `TasksDispatchRate` is a reliable metric for the rate at which
// backlog grows/shrinks. See `BacklogIncreaseRate`.
//
// Special note for workflow task queue type: this metric does not count sticky queue tasks. Hence, the reported
// value may be significantly lower than the actual number of workflow tasks added. Note that typically, only
// the first workflow task of each workflow goes to a normal queue, and the rest workflow tasks go to the sticky
// queue associated with a specific worker instance. Activity tasks always go to normal queues so their reported
// rate is accurate.
TasksAddRate float32
// Approximate tasks per second dispatched to workers, averaging the last 30 seconds. This includes both
// backlogged and sync-matched tasks, but excludes the Eagerly dispatched workflow and activity tasks (see
// documentation for `client.StartWorkflowOptions.EnableEagerStart` and `worker.Options.DisableEagerActivities`.)
//
// The difference between `TasksAddRate` and `TasksDispatchRate` is a reliable metric for the rate at which
// backlog grows/shrinks. See `BacklogIncreaseRate`.
//
// Special note for workflow task queue type: this metric does not count sticky queue tasks. Hence, the reported
// value may be significantly lower than the actual number of workflow tasks dispatched. Note that typically, only
// the first workflow task of each workflow goes to a normal queue, and the rest workflow tasks go to the sticky
// queue associated with a specific worker instance. Activity tasks always go to normal queues so their reported
// rate is accurate.
TasksDispatchRate float32
}

// TaskQueueTypeInfo specifies task queue information per task type and Build ID.
// It is included in [TaskQueueVersionInfo].
TaskQueueTypeInfo struct {
// Poller details for this task queue category.
Pollers []TaskQueuePollerInfo
Stats *TaskQueueStats
}

// TaskQueueVersionInfo includes task queue information per Build ID.
Expand Down Expand Up @@ -174,6 +233,7 @@ func (o *DescribeTaskQueueEnhancedOptions) validateAndConvertToProto(namespace s
TaskQueueTypes: taskQueueTypes,
ReportPollers: o.ReportPollers,
ReportTaskReachability: o.ReportTaskReachability,
ReportStats: o.ReportStats,
}

return opt, nil
Expand Down Expand Up @@ -220,6 +280,21 @@ func taskQueueTypeInfoFromResponse(response *taskqueuepb.TaskQueueTypeInfo) Task

return TaskQueueTypeInfo{
Pollers: pollers,
Stats: statsFromResponse(response.Stats),
}
}

func statsFromResponse(stats *taskqueuepb.TaskQueueStats) *TaskQueueStats {
if stats == nil {
return nil
}

return &TaskQueueStats{
ApproximateBacklogCount: stats.GetApproximateBacklogCount(),
ApproximateBacklogAge: stats.GetApproximateBacklogAge().AsDuration(),
TasksAddRate: stats.TasksAddRate,
TasksDispatchRate: stats.TasksDispatchRate,
BacklogIncreaseRate: stats.TasksAddRate - stats.TasksDispatchRate,
}
}

Expand Down
6 changes: 6 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,12 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
tagTaskQueue, taskQueue,
tagWorkerID, workerParams.Identity,
)
if workerParams.WorkerBuildID != "" {
// Add worker build ID to the logs if it's set by user
workerParams.Logger = log.With(workerParams.Logger,
tagBuildID, workerParams.WorkerBuildID,
)
}

processTestTags(&options, &workerParams)

Expand Down
134 changes: 133 additions & 1 deletion test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ package test_test

import (
"context"
"math"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/internal"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
Expand All @@ -54,7 +57,7 @@ func TestWorkerVersioningTestSuite(t *testing.T) {
func (ts *WorkerVersioningTestSuite) SetupSuite() {
ts.Assertions = require.New(ts.T())
ts.workflows = &Workflows{}
ts.activities = &Activities{}
ts.activities = newActivities()
ts.NoError(ts.InitConfigAndNamespace())
ts.NoError(ts.InitClient())
}
Expand Down Expand Up @@ -800,6 +803,100 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersionsWithRules() {
ts.Equal(client.BuildIDTaskReachability(client.BuildIDTaskReachabilityReachable), taskQueueVersionInfo.TaskReachability)
}

func (ts *WorkerVersioningTestSuite) TestTaskQueueStats() {
if os.Getenv("DISABLE_BACKLOG_STATS_TESTS") != "" {
ts.T().SkipNow()
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

fetchAndValidateStats := func(expectedWorkflowStats *client.TaskQueueStats, expectedActivityStats *client.TaskQueueStats) {
taskQueueInfo, err := ts.client.DescribeTaskQueueEnhanced(ctx, client.DescribeTaskQueueEnhancedOptions{
TaskQueue: ts.taskQueueName,
TaskQueueTypes: []client.TaskQueueType{
client.TaskQueueTypeWorkflow,
client.TaskQueueTypeActivity,
},
ReportStats: true,
})
ts.NoError(err)
ts.Equal(1, len(taskQueueInfo.VersionsInfo))

ts.validateTaskQueueStats(expectedWorkflowStats, taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow].Stats)
ts.validateTaskQueueStats(expectedActivityStats, taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeActivity].Stats)
}

// Basic workflow runs two activities
handle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("basic-wf"), ts.workflows.Basic)
ts.NoError(err)

// Wait until the task goes to the TQ
ts.EventuallyWithT(
func(t *assert.CollectT) {
taskQueueInfo, err := ts.client.DescribeTaskQueueEnhanced(ctx, client.DescribeTaskQueueEnhancedOptions{
TaskQueue: ts.taskQueueName,
TaskQueueTypes: []client.TaskQueueType{
client.TaskQueueTypeWorkflow,
},
ReportStats: true,
})
ts.NoError(err)
ts.Equal(1, len(taskQueueInfo.VersionsInfo))
ts.NotNil(taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow])
ts.NotNil(taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow].Stats)
assert.Greater(t, taskQueueInfo.VersionsInfo[""].TypesInfo[client.TaskQueueTypeWorkflow].Stats.ApproximateBacklogCount, int64(0))
},
time.Second, 100*time.Millisecond,
)

// no workers yet, so only workflow should have a backlog
fetchAndValidateStats(
&client.TaskQueueStats{
ApproximateBacklogCount: 1,
ApproximateBacklogAge: time.Millisecond,
BacklogIncreaseRate: 1,
TasksAddRate: 1,
TasksDispatchRate: 0,
},
&client.TaskQueueStats{
ApproximateBacklogCount: 0,
ApproximateBacklogAge: 0,
BacklogIncreaseRate: 0,
TasksAddRate: 0,
TasksDispatchRate: 0,
},
)

// run the worker
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{DisableEagerActivities: true})
ts.workflows.register(worker1)
ts.activities.register(worker1)
ts.NoError(worker1.Start())
defer worker1.Stop()

// Wait for the wf to finish
ts.NoError(handle.Get(ctx, nil))

// backlogs should be empty but the rates should be non-zero
fetchAndValidateStats(
&client.TaskQueueStats{
ApproximateBacklogCount: 0,
ApproximateBacklogAge: 0,
BacklogIncreaseRate: 0,
TasksAddRate: 1,
TasksDispatchRate: 1,
},
&client.TaskQueueStats{
ApproximateBacklogCount: 0,
ApproximateBacklogAge: 0,
BacklogIncreaseRate: 0,
TasksAddRate: 1,
TasksDispatchRate: 1,
},
)
}

func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
// TODO: Unskip this test, it is flaky with server 1.25.0-rc.0
if os.Getenv("DISABLE_SERVER_1_25_TESTS") != "" {
Expand Down Expand Up @@ -984,3 +1081,38 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetimeWithR
ts.NoError(enval.Get(&lastBuildID))
ts.Equal("1.1", lastBuildID)
}

// validateTaskQueueStats compares expected vs actual stats.
// For age and rates, it treats all non-zero values the same.
// For BacklogIncreaseRate for non-zero expected values we only compare the sign (i.e. backlog grows or shrinks), while
// zero expected value means "not specified".
func (ts *WorkerVersioningTestSuite) validateTaskQueueStats(expected *client.TaskQueueStats, actual *internal.TaskQueueStats) {
if expected == nil {
ts.Nil(actual)
return
}
ts.NotNil(actual)
ts.Equal(expected.ApproximateBacklogCount, actual.ApproximateBacklogCount)
if expected.ApproximateBacklogAge == 0 {
ts.Equal(time.Duration(0), actual.ApproximateBacklogAge)
} else {
ts.Greater(actual.ApproximateBacklogAge, time.Duration(0))
}
if expected.TasksAddRate == 0 {
// TODO: do not accept NaN once the server code is fixed: https://github.com/temporalio/temporal/pull/6404
ts.True(float32(0) == actual.TasksAddRate || math.IsNaN(float64(actual.TasksAddRate)))
} else {
ts.Greater(actual.TasksAddRate, float32(0))
}
if expected.TasksDispatchRate == 0 {
// TODO: do not accept NaN once the server code is fixed: https://github.com/temporalio/temporal/pull/6404
ts.True(float32(0) == actual.TasksDispatchRate || math.IsNaN(float64(actual.TasksDispatchRate)))
} else {
ts.Greater(actual.TasksDispatchRate, float32(0))
}
if expected.BacklogIncreaseRate > 0 {
ts.Greater(actual.BacklogIncreaseRate, float32(0))
} else if expected.BacklogIncreaseRate < 0 {
ts.Less(actual.BacklogIncreaseRate, float32(0))
}
}

0 comments on commit b5db2b7

Please sign in to comment.