Skip to content

Commit

Permalink
Add tags to metrics (#414)
Browse files Browse the repository at this point in the history
* Add tags to metrics

* fix typo

* refactor to use tagScope

* Avoid repeated creation of subscope

* Use thread safe map

* fix lint

* Create new TaggedScope

* change NewTaggedScope()
  • Loading branch information
vancexu authored Mar 16, 2018
1 parent 8e121e2 commit a31deb5
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 17 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type (
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator

// CompleteActivity reports activity completed.
// activity Execute method can return acitivity.activity.ErrResultPending to
// activity Execute method can return activity.ErrResultPending to
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
// should be called when that activity is completed with the actual result and error. If err is nil, activity task
// completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise,
Expand Down
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
return &workflowClient{
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
domain: domain,
metricsScope: metricScope,
metricsScope: metrics.NewTaggedScope(metricScope),
identity: identity,
}
}
Expand Down
34 changes: 34 additions & 0 deletions internal/common/metrics/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package metrics

import (
"sync"
"time"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -69,6 +70,12 @@ type (
recorder durationRecorder
clock Clock
}

// TaggedScope provides metricScope with tags
TaggedScope struct {
tally.Scope
*sync.Map
}
)

// WrapScope wraps a scope and skip recording metrics when isReplay is true.
Expand Down Expand Up @@ -186,3 +193,30 @@ func (s *replayAwareScope) SubScope(name string) tally.Scope {
func (s *replayAwareScope) Capabilities() tally.Capabilities {
return s.scope.Capabilities()
}

// GetTaggedScope return a scope with tag
func (ts *TaggedScope) GetTaggedScope(tagName, tagValue string) tally.Scope {
if ts.Map == nil {
ts.Map = &sync.Map{}
}

key := tagName + ":" + tagValue // used to prevent collision of tagValue (map key) for different tagName
taggedScope, ok := ts.Load(key)
if !ok {
ts.Store(key, ts.Scope.Tagged(map[string]string{tagName: tagValue}))
taggedScope, _ = ts.Load(key)
}
if taggedScope == nil {
panic("metric scope cannot be tagged") // This should never happen
}

return taggedScope.(tally.Scope)
}

// NewTaggedScope create a new TaggedScope
func NewTaggedScope(scope tally.Scope) *TaggedScope {
if scope == nil {
scope = tally.NoopScope
}
return &TaggedScope{Scope: scope, Map: &sync.Map{}}
}
31 changes: 31 additions & 0 deletions internal/common/metrics/scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"sync"
)

func Test_Counter(t *testing.T) {
Expand Down Expand Up @@ -126,13 +127,43 @@ func Test_ScopeCoverage(t *testing.T) {
require.Equal(t, 1, len(reporter.counts))
}

func Test_TaggedScope(t *testing.T) {
taggedScope, closer, reporter := newTaggedMetricsScope()
scope := taggedScope.GetTaggedScope("tag1", "val1")
scope.Counter("test-name").Inc(3)
closer.Close()
require.Equal(t, 1, len(reporter.counts))
require.Equal(t, int64(3), reporter.counts[0].value)

m := &sync.Map{}
taggedScope, closer, reporter = newTaggedMetricsScope()
taggedScope.Map = m
scope = taggedScope.GetTaggedScope("tag2", "val1")
scope.Counter("test-name").Inc(2)
taggedScope, closer2, reporter2 := newTaggedMetricsScope()
taggedScope.Map = m
scope = taggedScope.GetTaggedScope("tag2", "val1")
scope.Counter("test-name").Inc(1)
closer2.Close()
require.Equal(t, 0, len(reporter2.counts))
closer.Close()
require.Equal(t, 1, len(reporter.counts))
require.Equal(t, int64(3), reporter.counts[0].value)
}

func newMetricsScope(isReplay *bool) (tally.Scope, io.Closer, *capturingStatsReporter) {
reporter := &capturingStatsReporter{}
opts := tally.ScopeOptions{Reporter: reporter}
scope, closer := tally.NewRootScope(opts, time.Second)
return WrapScope(isReplay, scope, &realClock{}), closer, reporter
}

func newTaggedMetricsScope() (*TaggedScope, io.Closer, *capturingStatsReporter) {
isReplay := false
scope, closer, reporter := newMetricsScope(&isReplay)
return &TaggedScope{Scope: scope}, closer, reporter
}

type realClock struct {
}

Expand Down
3 changes: 2 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func newWorkflowExecutionEventHandler(
).WithOptions(zap.WrapCore(wrapLogger(&context.isReplay, &context.enableLoggingInReplay)))

if scope != nil {
context.metricsScope = metrics.WrapScope(&context.isReplay, scope, context)
context.metricsScope = tagScope(metrics.WrapScope(&context.isReplay, scope, context),
tagWorkflowType, workflowInfo.WorkflowType.Name)
}

return &workflowExecutionEventHandlerImpl{context, nil}
Expand Down
7 changes: 4 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type (
taskListName string
identity string
service workflowserviceclient.Interface
metricsScope tally.Scope
metricsScope *metrics.TaggedScope
logger *zap.Logger
userContext context.Context
hostEnv *hostEnvImpl
Expand Down Expand Up @@ -1089,7 +1089,7 @@ func newActivityTaskHandlerWithCustomProvider(
identity: params.Identity,
service: service,
logger: params.Logger,
metricsScope: params.MetricsScope,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
userContext: params.UserContext,
hostEnv: env,
activityProvider: activityProvider,
Expand Down Expand Up @@ -1247,7 +1247,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
ath.logger.Error("Activity panic.",
zap.String("PanicError", fmt.Sprintf("%v", p)),
zap.String("PanicStack", st))
ath.metricsScope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
scope := ath.metricsScope.GetTaggedScope(tagActivityType, t.ActivityType.GetName())
scope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
panicErr := newPanicError(p, st)
result, err = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr), nil
}
Expand Down
14 changes: 11 additions & 3 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func newAggregatedWorker(
}

ensureRequiredParams(&workerParams)
workerParams.MetricsScope = tagScope(workerParams.MetricsScope, tagDomain, domain)
workerParams.MetricsScope = tagScope(workerParams.MetricsScope, tagDomain, domain, tagTaskList, taskList)
workerParams.Logger = workerParams.Logger.With(
zapcore.Field{Key: tagDomain, Type: zapcore.StringType, String: domain},
zapcore.Field{Key: tagTaskList, Type: zapcore.StringType, String: taskList},
Expand Down Expand Up @@ -1073,11 +1073,19 @@ func newAggregatedWorker(
}
}

func tagScope(metricsScope tally.Scope, tagName, tagValue string) tally.Scope {
// tagScope with one or multiple tags, like
// tagScope(scope, tag1, val1, tag2, val2)
func tagScope(metricsScope tally.Scope, keyValueinPairs ...string) tally.Scope {
if metricsScope == nil {
metricsScope = tally.NoopScope
}
tagsMap := map[string]string{tagName: tagValue}
if len(keyValueinPairs)%2 != 0 {
panic("tagScope key value are not in pairs")
}
tagsMap := map[string]string{}
for i := 0; i < len(keyValueinPairs); i += 2 {
tagsMap[keyValueinPairs[i]] = keyValueinPairs[i+1]
}
return metricsScope.Tagged(tagsMap)
}

Expand Down
7 changes: 4 additions & 3 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type (
workflowClient struct {
workflowService workflowserviceclient.Interface
domain string
metricsScope tally.Scope
metricsScope *metrics.TaggedScope
identity string
}

Expand Down Expand Up @@ -190,7 +190,8 @@ func (wc *workflowClient) StartWorkflow(
}

if wc.metricsScope != nil {
wc.metricsScope.Counter(metrics.WorkflowStartCounter).Inc(1)
scope := wc.metricsScope.GetTaggedScope(tagWorkflowType, workflowType.Name)
scope.Counter(metrics.WorkflowStartCounter).Inc(1)
}

executionInfo := &WorkflowExecution{
Expand Down Expand Up @@ -534,7 +535,7 @@ func (wc *workflowClient) QueryWorkflow(ctx context.Context, workflowID string,
// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// - tasklist name of tasklist
// - tasklistType type of tasklist, can be decition or activity
// - tasklistType type of tasklist, can be decision or activity
// The errors it can return:
// - BadRequestError
// - InternalServiceError
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/cadence/internal/common/metrics"
)

const (
Expand Down Expand Up @@ -190,7 +190,7 @@ func (s *workflowRunSuite) SetupTest() {
mockCtrl := gomock.NewController(s.T())
s.workflowServiceClient = workflowservicetest.NewMockClient(mockCtrl)

metricsScope := tally.NoopScope
metricsScope := metrics.NewTaggedScope(nil)
s.workflowClient = &workflowClient{
workflowService: s.workflowServiceClient,
domain: domain,
Expand Down
7 changes: 4 additions & 3 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/internal/common"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/yarpc"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -113,7 +114,7 @@ type (
service workflowserviceclient.Interface
workerOptions WorkerOptions
logger *zap.Logger
metricsScope tally.Scope
metricsScope *metrics.TaggedScope
mockClock *clock.Mock
wallClock clock.Clock

Expand Down Expand Up @@ -173,7 +174,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite) *testWorkflowEnvironme
taskListSpecificActivities: make(map[string]*taskListSpecificActivity),

logger: s.logger,
metricsScope: s.scope,
metricsScope: metrics.NewTaggedScope(nil),
mockClock: clock.NewMock(),
wallClock: clock.New(),
timers: make(map[string]*testTimerHandle),
Expand Down Expand Up @@ -212,7 +213,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite) *testWorkflowEnvironme
env.logger = logger
}
if env.metricsScope == nil {
env.metricsScope = tally.NoopScope
env.metricsScope = metrics.NewTaggedScope(s.scope)
}

// setup mock service
Expand Down

0 comments on commit a31deb5

Please sign in to comment.