Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tags to metrics #414

Merged
merged 8 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type (
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator

// CompleteActivity reports activity completed.
// activity Execute method can return acitivity.activity.ErrResultPending to
// activity Execute method can return activity.ErrResultPending to
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
// should be called when that activity is completed with the actual result and error. If err is nil, activity task
// completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise,
Expand Down
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
return &workflowClient{
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
domain: domain,
metricsScope: metricScope,
metricsScope: metrics.NewTaggedScope(metricScope),
identity: identity,
}
}
Expand Down
34 changes: 34 additions & 0 deletions internal/common/metrics/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package metrics

import (
"sync"
"time"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -69,6 +70,12 @@ type (
recorder durationRecorder
clock Clock
}

// TaggedScope provides metricScope with tags
TaggedScope struct {
tally.Scope
*sync.Map
}
)

// WrapScope wraps a scope and skip recording metrics when isReplay is true.
Expand Down Expand Up @@ -186,3 +193,30 @@ func (s *replayAwareScope) SubScope(name string) tally.Scope {
func (s *replayAwareScope) Capabilities() tally.Capabilities {
return s.scope.Capabilities()
}

// GetTaggedScope return a scope with tag
func (ts *TaggedScope) GetTaggedScope(tagName, tagValue string) tally.Scope {
if ts.Map == nil {
ts.Map = &sync.Map{}
}

key := tagName + ":" + tagValue // used to prevent collision of tagValue (map key) for different tagName
taggedScope, ok := ts.Load(key)
if !ok {
ts.Store(key, ts.Scope.Tagged(map[string]string{tagName: tagValue}))
taggedScope, _ = ts.Load(key)
}
if taggedScope == nil {
panic("metric scope cannot be tagged") // This should never happen
}

return taggedScope.(tally.Scope)
}

// NewTaggedScope create a new TaggedScope
func NewTaggedScope(scope tally.Scope) *TaggedScope {
if scope == nil {
scope = tally.NoopScope
}
return &TaggedScope{Scope: scope, Map: &sync.Map{}}
}
31 changes: 31 additions & 0 deletions internal/common/metrics/scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"sync"
)

func Test_Counter(t *testing.T) {
Expand Down Expand Up @@ -126,13 +127,43 @@ func Test_ScopeCoverage(t *testing.T) {
require.Equal(t, 1, len(reporter.counts))
}

func Test_TaggedScope(t *testing.T) {
taggedScope, closer, reporter := newTaggedMetricsScope()
scope := taggedScope.GetTaggedScope("tag1", "val1")
scope.Counter("test-name").Inc(3)
closer.Close()
require.Equal(t, 1, len(reporter.counts))
require.Equal(t, int64(3), reporter.counts[0].value)

m := &sync.Map{}
taggedScope, closer, reporter = newTaggedMetricsScope()
taggedScope.Map = m
scope = taggedScope.GetTaggedScope("tag2", "val1")
scope.Counter("test-name").Inc(2)
taggedScope, closer2, reporter2 := newTaggedMetricsScope()
taggedScope.Map = m
scope = taggedScope.GetTaggedScope("tag2", "val1")
scope.Counter("test-name").Inc(1)
closer2.Close()
require.Equal(t, 0, len(reporter2.counts))
closer.Close()
require.Equal(t, 1, len(reporter.counts))
require.Equal(t, int64(3), reporter.counts[0].value)
}

func newMetricsScope(isReplay *bool) (tally.Scope, io.Closer, *capturingStatsReporter) {
reporter := &capturingStatsReporter{}
opts := tally.ScopeOptions{Reporter: reporter}
scope, closer := tally.NewRootScope(opts, time.Second)
return WrapScope(isReplay, scope, &realClock{}), closer, reporter
}

func newTaggedMetricsScope() (*TaggedScope, io.Closer, *capturingStatsReporter) {
isReplay := false
scope, closer, reporter := newMetricsScope(&isReplay)
return &TaggedScope{Scope: scope}, closer, reporter
}

type realClock struct {
}

Expand Down
3 changes: 2 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func newWorkflowExecutionEventHandler(
).WithOptions(zap.WrapCore(wrapLogger(&context.isReplay, &context.enableLoggingInReplay)))

if scope != nil {
context.metricsScope = metrics.WrapScope(&context.isReplay, scope, context)
context.metricsScope = tagScope(metrics.WrapScope(&context.isReplay, scope, context),
tagWorkflowType, workflowInfo.WorkflowType.Name)
}

return &workflowExecutionEventHandlerImpl{context, nil}
Expand Down
7 changes: 4 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type (
taskListName string
identity string
service workflowserviceclient.Interface
metricsScope tally.Scope
metricsScope *metrics.TaggedScope
logger *zap.Logger
userContext context.Context
hostEnv *hostEnvImpl
Expand Down Expand Up @@ -1089,7 +1089,7 @@ func newActivityTaskHandlerWithCustomProvider(
identity: params.Identity,
service: service,
logger: params.Logger,
metricsScope: params.MetricsScope,
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
userContext: params.UserContext,
hostEnv: env,
activityProvider: activityProvider,
Expand Down Expand Up @@ -1247,7 +1247,8 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
ath.logger.Error("Activity panic.",
zap.String("PanicError", fmt.Sprintf("%v", p)),
zap.String("PanicStack", st))
ath.metricsScope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
scope := ath.metricsScope.GetTaggedScope(tagActivityType, t.ActivityType.GetName())
scope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
panicErr := newPanicError(p, st)
result, err = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr), nil
}
Expand Down
14 changes: 11 additions & 3 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func newAggregatedWorker(
}

ensureRequiredParams(&workerParams)
workerParams.MetricsScope = tagScope(workerParams.MetricsScope, tagDomain, domain)
workerParams.MetricsScope = tagScope(workerParams.MetricsScope, tagDomain, domain, tagTaskList, taskList)
workerParams.Logger = workerParams.Logger.With(
zapcore.Field{Key: tagDomain, Type: zapcore.StringType, String: domain},
zapcore.Field{Key: tagTaskList, Type: zapcore.StringType, String: taskList},
Expand Down Expand Up @@ -1073,11 +1073,19 @@ func newAggregatedWorker(
}
}

func tagScope(metricsScope tally.Scope, tagName, tagValue string) tally.Scope {
// tagScope with one or multiple tags, like
// tagScope(scope, tag1, val1, tag2, val2)
func tagScope(metricsScope tally.Scope, keyValueinPairs ...string) tally.Scope {
if metricsScope == nil {
metricsScope = tally.NoopScope
}
tagsMap := map[string]string{tagName: tagValue}
if len(keyValueinPairs)%2 != 0 {
panic("tagScope key value are not in pairs")
}
tagsMap := map[string]string{}
for i := 0; i < len(keyValueinPairs); i += 2 {
tagsMap[keyValueinPairs[i]] = keyValueinPairs[i+1]
}
return metricsScope.Tagged(tagsMap)
}

Expand Down
7 changes: 4 additions & 3 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type (
workflowClient struct {
workflowService workflowserviceclient.Interface
domain string
metricsScope tally.Scope
metricsScope *metrics.TaggedScope
identity string
}

Expand Down Expand Up @@ -190,7 +190,8 @@ func (wc *workflowClient) StartWorkflow(
}

if wc.metricsScope != nil {
wc.metricsScope.Counter(metrics.WorkflowStartCounter).Inc(1)
scope := wc.metricsScope.GetTaggedScope(tagWorkflowType, workflowType.Name)
scope.Counter(metrics.WorkflowStartCounter).Inc(1)
}

executionInfo := &WorkflowExecution{
Expand Down Expand Up @@ -534,7 +535,7 @@ func (wc *workflowClient) QueryWorkflow(ctx context.Context, workflowID string,
// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// - tasklist name of tasklist
// - tasklistType type of tasklist, can be decition or activity
// - tasklistType type of tasklist, can be decision or activity
// The errors it can return:
// - BadRequestError
// - InternalServiceError
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/cadence/internal/common/metrics"
)

const (
Expand Down Expand Up @@ -190,7 +190,7 @@ func (s *workflowRunSuite) SetupTest() {
mockCtrl := gomock.NewController(s.T())
s.workflowServiceClient = workflowservicetest.NewMockClient(mockCtrl)

metricsScope := tally.NoopScope
metricsScope := metrics.NewTaggedScope(nil)
s.workflowClient = &workflowClient{
workflowService: s.workflowServiceClient,
domain: domain,
Expand Down
7 changes: 4 additions & 3 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/internal/common"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/yarpc"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -113,7 +114,7 @@ type (
service workflowserviceclient.Interface
workerOptions WorkerOptions
logger *zap.Logger
metricsScope tally.Scope
metricsScope *metrics.TaggedScope
mockClock *clock.Mock
wallClock clock.Clock

Expand Down Expand Up @@ -173,7 +174,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite) *testWorkflowEnvironme
taskListSpecificActivities: make(map[string]*taskListSpecificActivity),

logger: s.logger,
metricsScope: s.scope,
metricsScope: metrics.NewTaggedScope(nil),
mockClock: clock.NewMock(),
wallClock: clock.New(),
timers: make(map[string]*testTimerHandle),
Expand Down Expand Up @@ -212,7 +213,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite) *testWorkflowEnvironme
env.logger = logger
}
if env.metricsScope == nil {
env.metricsScope = tally.NoopScope
env.metricsScope = metrics.NewTaggedScope(s.scope)
}

// setup mock service
Expand Down