Skip to content

Commit

Permalink
Add metrics to persistence for visibility (#486)
Browse files Browse the repository at this point in the history
* Add metrics to persistence for visibility

* Add EntityNotExistsError
  • Loading branch information
vancexu authored Jan 3, 2018
1 parent 8a89e91 commit bb03ce5
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 26 deletions.
80 changes: 55 additions & 25 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ const (
PersistenceDeleteDomainScope
// PersistenceDeleteDomainByNameScope tracks DeleteDomainByName calls made by service to persistence layer
PersistenceDeleteDomainByNameScope
// PersistenceRecordWorkflowExecutionStartedScope tracks RecordWorkflowExecutionStarted calls made by service to persistence layer
PersistenceRecordWorkflowExecutionStartedScope
// PersistenceRecordWorkflowExecutionClosedScope tracks RecordWorkflowExecutionClosed calls made by service to persistence layer
PersistenceRecordWorkflowExecutionClosedScope
// PersistenceListOpenWorkflowExecutionsScope tracks ListOpenWorkflowExecutions calls made by service to persistence layer
PersistenceListOpenWorkflowExecutionsScope
// PersistenceListClosedWorkflowExecutionsScope tracks ListClosedWorkflowExecutions calls made by service to persistence layer
PersistenceListClosedWorkflowExecutionsScope
// PersistenceListOpenWorkflowExecutionsByTypeScope tracks ListOpenWorkflowExecutionsByType calls made by service to persistence layer
PersistenceListOpenWorkflowExecutionsByTypeScope
// PersistenceListClosedWorkflowExecutionsByTypeScope tracks ListClosedWorkflowExecutionsByType calls made by service to persistence layer
PersistenceListClosedWorkflowExecutionsByTypeScope
// PersistenceListOpenWorkflowExecutionsByWorkflowIDScope tracks ListOpenWorkflowExecutionsByWorkflowID calls made by service to persistence layer
PersistenceListOpenWorkflowExecutionsByWorkflowIDScope
// PersistenceListClosedWorkflowExecutionsByWorkflowIDScope tracks ListClosedWorkflowExecutionsByWorkflowID calls made by service to persistence layer
PersistenceListClosedWorkflowExecutionsByWorkflowIDScope
// PersistenceListClosedWorkflowExecutionsByStatusScope tracks ListClosedWorkflowExecutionsByStatus calls made by service to persistence layer
PersistenceListClosedWorkflowExecutionsByStatusScope
// PersistenceGetClosedWorkflowExecutionScope tracks GetClosedWorkflowExecution calls made by service to persistence layer
PersistenceGetClosedWorkflowExecutionScope
// HistoryClientStartWorkflowExecutionScope tracks RPC calls to history service
HistoryClientStartWorkflowExecutionScope
// HistoryClientRecordActivityTaskHeartbeatScope tracks RPC calls to history service
Expand Down Expand Up @@ -359,31 +379,41 @@ const (
var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
// common scope Names
Common: {
PersistenceCreateShardScope: {operation: "CreateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetShardScope: {operation: "GetShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateShardScope: {operation: "UpdateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateWorkflowExecutionScope: {operation: "CreateWorkflowExecution"},
PersistenceGetWorkflowExecutionScope: {operation: "GetWorkflowExecution"},
PersistenceUpdateWorkflowExecutionScope: {operation: "UpdateWorkflowExecution"},
PersistenceDeleteWorkflowExecutionScope: {operation: "DeleteWorkflowExecution"},
PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"},
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceCreateTaskScope: {operation: "CreateTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetTasksScope: {operation: "GetTasks", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCompleteTaskScope: {operation: "CompleteTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceAppendHistoryEventsScope: {operation: "AppendHistoryEvents", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetWorkflowExecutionHistoryScope: {operation: "GetWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteWorkflowExecutionHistoryScope: {operation: "DeleteWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateDomainScope: {operation: "CreateDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetDomainScope: {operation: "GetDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateDomainScope: {operation: "UpdateDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainScope: {operation: "DeleteDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainByNameScope: {operation: "DeleteDomainByName", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateShardScope: {operation: "CreateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetShardScope: {operation: "GetShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateShardScope: {operation: "UpdateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateWorkflowExecutionScope: {operation: "CreateWorkflowExecution"},
PersistenceGetWorkflowExecutionScope: {operation: "GetWorkflowExecution"},
PersistenceUpdateWorkflowExecutionScope: {operation: "UpdateWorkflowExecution"},
PersistenceDeleteWorkflowExecutionScope: {operation: "DeleteWorkflowExecution"},
PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"},
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceCreateTaskScope: {operation: "CreateTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetTasksScope: {operation: "GetTasks", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCompleteTaskScope: {operation: "CompleteTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceAppendHistoryEventsScope: {operation: "AppendHistoryEvents", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetWorkflowExecutionHistoryScope: {operation: "GetWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteWorkflowExecutionHistoryScope: {operation: "DeleteWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateDomainScope: {operation: "CreateDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetDomainScope: {operation: "GetDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateDomainScope: {operation: "UpdateDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainScope: {operation: "DeleteDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainByNameScope: {operation: "DeleteDomainByName", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceRecordWorkflowExecutionStartedScope: {operation: "RecordWorkflowExecutionStarted"},
PersistenceRecordWorkflowExecutionClosedScope: {operation: "RecordWorkflowExecutionClosed"},
PersistenceListOpenWorkflowExecutionsScope: {operation: "ListOpenWorkflowExecutions"},
PersistenceListClosedWorkflowExecutionsScope: {operation: "ListClosedWorkflowExecutions"},
PersistenceListOpenWorkflowExecutionsByTypeScope: {operation: "ListOpenWorkflowExecutionsByType"},
PersistenceListClosedWorkflowExecutionsByTypeScope: {operation: "ListClosedWorkflowExecutionsByType"},
PersistenceListOpenWorkflowExecutionsByWorkflowIDScope: {operation: "ListOpenWorkflowExecutionsByWorkflowID"},
PersistenceListClosedWorkflowExecutionsByWorkflowIDScope: {operation: "ListClosedWorkflowExecutionsByWorkflowID"},
PersistenceListClosedWorkflowExecutionsByStatusScope: {operation: "ListClosedWorkflowExecutionsByStatus"},
PersistenceGetClosedWorkflowExecutionScope: {operation: "GetClosedWorkflowExecution"},

HistoryClientStartWorkflowExecutionScope: {operation: "HistoryClientStartWorkflowExecution"},
HistoryClientRecordActivityTaskHeartbeatScope: {operation: "HistoryClientRecordActivityTaskHeartbeat"},
Expand Down
177 changes: 176 additions & 1 deletion common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,19 @@ type (
metricClient metrics.Client
persistence MetadataManager
}

visibilityPersistenceClient struct {
metricClient metrics.Client
persistence VisibilityManager
}
)

var _ ShardManager = (*shardPersistenceClient)(nil)
var _ ExecutionManager = (*workflowExecutionPersistenceClient)(nil)
var _ TaskManager = (*taskPersistenceClient)(nil)
var _ HistoryManager = (*historyPersistenceClient)(nil)
var _ MetadataManager = (*metadataPersistenceClient)(nil)
var _ VisibilityManager = (*visibilityPersistenceClient)(nil)

// NewShardPersistenceClient creates a client to manage shards
func NewShardPersistenceClient(persistence ShardManager, metricClient metrics.Client) ShardManager {
Expand Down Expand Up @@ -90,14 +96,22 @@ func NewHistoryPersistenceClient(persistence HistoryManager, metricClient metric
}
}

// NewMetadataPersistenceClient creates a HistoryManager client to manage workflow execution history
// NewMetadataPersistenceClient creates a MetadataManager client to manage metadata
func NewMetadataPersistenceClient(persistence MetadataManager, metricClient metrics.Client) MetadataManager {
return &metadataPersistenceClient{
persistence: persistence,
metricClient: metricClient,
}
}

// NewVisibilityPersistenceClient creates a client to manage visibility
func NewVisibilityPersistenceClient(persistence VisibilityManager, metricClient metrics.Client) VisibilityManager {
return &visibilityPersistenceClient{
persistence: persistence,
metricClient: metricClient,
}
}

func (p *shardPersistenceClient) CreateShard(request *CreateShardRequest) error {
p.metricClient.IncCounter(metrics.PersistenceCreateShardScope, metrics.PersistenceRequests)

Expand Down Expand Up @@ -541,3 +555,164 @@ func (p *metadataPersistenceClient) updateErrorMetric(scope int, err error) {
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
}
}

func (p *visibilityPersistenceClient) RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error {
p.metricClient.IncCounter(metrics.PersistenceRecordWorkflowExecutionStartedScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceRecordWorkflowExecutionStartedScope, metrics.PersistenceLatency)
err := p.persistence.RecordWorkflowExecutionStarted(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceRecordWorkflowExecutionStartedScope, err)
}

return err
}

func (p *visibilityPersistenceClient) RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error {
p.metricClient.IncCounter(metrics.PersistenceRecordWorkflowExecutionClosedScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceRecordWorkflowExecutionClosedScope, metrics.PersistenceLatency)
err := p.persistence.RecordWorkflowExecutionClosed(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceRecordWorkflowExecutionClosedScope, err)
}

return err
}

func (p *visibilityPersistenceClient) ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListOpenWorkflowExecutionsScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListOpenWorkflowExecutionsScope, metrics.PersistenceLatency)
response, err := p.persistence.ListOpenWorkflowExecutions(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListOpenWorkflowExecutionsScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListClosedWorkflowExecutionsScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListClosedWorkflowExecutionsScope, metrics.PersistenceLatency)
response, err := p.persistence.ListClosedWorkflowExecutions(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListClosedWorkflowExecutionsScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListOpenWorkflowExecutionsByTypeScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListOpenWorkflowExecutionsByTypeScope, metrics.PersistenceLatency)
response, err := p.persistence.ListOpenWorkflowExecutionsByType(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListOpenWorkflowExecutionsByTypeScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListClosedWorkflowExecutionsByTypeScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListClosedWorkflowExecutionsByTypeScope, metrics.PersistenceLatency)
response, err := p.persistence.ListClosedWorkflowExecutionsByType(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListClosedWorkflowExecutionsByTypeScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListOpenWorkflowExecutionsByWorkflowIDScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListOpenWorkflowExecutionsByWorkflowIDScope, metrics.PersistenceLatency)
response, err := p.persistence.ListOpenWorkflowExecutionsByWorkflowID(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListOpenWorkflowExecutionsByWorkflowIDScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListClosedWorkflowExecutionsByWorkflowIDScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListClosedWorkflowExecutionsByWorkflowIDScope, metrics.PersistenceLatency)
response, err := p.persistence.ListClosedWorkflowExecutionsByWorkflowID(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListClosedWorkflowExecutionsByWorkflowIDScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListClosedWorkflowExecutionsByStatusScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListClosedWorkflowExecutionsByStatusScope, metrics.PersistenceLatency)
response, err := p.persistence.ListClosedWorkflowExecutionsByStatus(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListClosedWorkflowExecutionsByStatusScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) GetClosedWorkflowExecution(request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetClosedWorkflowExecutionScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetClosedWorkflowExecutionScope, metrics.PersistenceLatency)
response, err := p.persistence.GetClosedWorkflowExecution(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetClosedWorkflowExecutionScope, err)
}

return response, err
}

func (p *visibilityPersistenceClient) updateErrorMetric(scope int, err error) {
switch err.(type) {
case *ConditionFailedError:
p.metricClient.IncCounter(scope, metrics.PersistenceErrConditionFailedCounter)
case *TimeoutError:
p.metricClient.IncCounter(scope, metrics.PersistenceErrTimeoutCounter)
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
case *workflow.EntityNotExistsError:
p.metricClient.IncCounter(scope, metrics.CadenceErrEntityNotExistsCounter)
case *workflow.ServiceBusyError:
p.metricClient.IncCounter(scope, metrics.PersistenceErrBusyCounter)
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
default:
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
}
}

func (p *visibilityPersistenceClient) Close() {
p.persistence.Close()
}
1 change: 1 addition & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s *Service) Start() {
if err != nil {
log.Fatalf("failed to create visiblity manager: %v", err)
}
visibility = persistence.NewVisibilityPersistenceClient(visibility, base.GetMetricsClient())

history, err := persistence.NewCassandraHistoryPersistence(p.CassandraConfig.Hosts,
p.CassandraConfig.Port,
Expand Down
1 change: 1 addition & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (s *Service) Start() {
if err != nil {
log.Fatalf("failed to create visiblity manager: %v", err)
}
visibility = persistence.NewVisibilityPersistenceClient(visibility, base.GetMetricsClient())

history, err := persistence.NewCassandraHistoryPersistence(p.CassandraConfig.Hosts,
p.CassandraConfig.Port,
Expand Down

0 comments on commit bb03ce5

Please sign in to comment.