Skip to content

Commit

Permalink
buffer events when decision task is inflight (#386)
Browse files Browse the repository at this point in the history
* buffer events when decision task is inflight

* update based on review comments
  • Loading branch information
yiminc authored Oct 26, 2017
1 parent 1fb0a3d commit d79841b
Show file tree
Hide file tree
Showing 18 changed files with 429 additions and 70 deletions.
1 change: 1 addition & 0 deletions common/logging/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
HistoryEngineShutdown = 2003
PersistentStoreErrorEventID = 2010
HistorySerializationErrorEventID = 2020
HistoryDeserializationErrorEventID = 2021
DuplicateTaskEventID = 2030
MultipleCompletionDecisionsEventID = 2040
DuplicateTransferTaskEventID = 2050
Expand Down
8 changes: 8 additions & 0 deletions common/logging/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func LogHistorySerializationErrorEvent(logger bark.Logger, err error, msg string
}).Errorf("Error serializing workflow execution history. Msg: %v", msg)
}

// LogHistoryDeserializationErrorEvent is used to log errors deserializing execution history
func LogHistoryDeserializationErrorEvent(logger bark.Logger, err error, msg string) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: HistoryDeserializationErrorEventID,
TagWorkflowErr: err,
}).Errorf("Error deserializing workflow execution history. Msg: %v", msg)
}

// LogHistoryEngineStartingEvent is used to log history engine starting
func LogHistoryEngineStartingEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
Expand Down
31 changes: 16 additions & 15 deletions common/metrics/tally/statsd/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@ package statsd

import (
"bytes"
"sort"
"time"

"github.com/cactus/go-statsd-client/statsd"
"github.com/uber-go/tally"
tallystatsdreporter "github.com/uber-go/tally/statsd"
"sort"
"time"
)

type cadenceTallyStatsdReporter struct {
//Wrapper on top of "github.com/uber-go/tally/statsd"
tallystatsd tally.StatsReporter
}

func (r *cadenceTallyStatsdReporter) metricNameWithTags(original_name string, tags map[string]string) string {
func (r *cadenceTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string {
var keys []string
for k := range tags {
keys = append(keys, k)
}
sort.Strings(keys)

var buffer bytes.Buffer
buffer.WriteString(original_name)
buffer.WriteString(originalName)

for _, tk := range keys {
// adding "." as delimiter so that it will show as different parts in Graphite/Grafana
Expand All @@ -52,7 +53,7 @@ func (r *cadenceTallyStatsdReporter) metricNameWithTags(original_name string, ta
return buffer.String()
}

// This is a wrapper on top of "github.com/uber-go/tally/statsd"
// NewReporter is a wrapper on top of "github.com/uber-go/tally/statsd"
// The purpose is to support tagging
// The implementation is to append tags as metric name suffixes
func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter {
Expand All @@ -62,18 +63,18 @@ func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.
}

func (r *cadenceTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(new_name, map[string]string{}, value)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(newName, map[string]string{}, value)
}

func (r *cadenceTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(new_name, map[string]string{}, value)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(newName, map[string]string{}, value)
}

func (r *cadenceTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(new_name, map[string]string{}, interval)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(newName, map[string]string{}, interval)
}

func (r *cadenceTallyStatsdReporter) ReportHistogramValueSamples(
Expand All @@ -84,8 +85,8 @@ func (r *cadenceTallyStatsdReporter) ReportHistogramValueSamples(
bucketUpperBound float64,
samples int64,
) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramValueSamples(new_name, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramValueSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
}

func (r *cadenceTallyStatsdReporter) ReportHistogramDurationSamples(
Expand All @@ -96,8 +97,8 @@ func (r *cadenceTallyStatsdReporter) ReportHistogramDurationSamples(
bucketUpperBound time.Duration,
samples int64,
) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramDurationSamples(new_name, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramDurationSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
}

func (r *cadenceTallyStatsdReporter) Capabilities() tally.Capabilities {
Expand Down
82 changes: 81 additions & 1 deletion common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ const (
`and task_id = ? ` +
`IF range_id = ?`

templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map ` +
templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map, buffered_events_list ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -343,6 +343,28 @@ const (
`and task_id = ? ` +
`IF next_event_id = ?`

templateAppendBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = buffered_events_list + ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = [] ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteActivityInfoQuery = `DELETE activity_map[ ? ] ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -942,6 +964,14 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
}
state.RequestCancelInfos = requestCancelInfos

eList := result["buffered_events_list"].([]map[string]interface{})
bufferedEvents := make([]*SerializedHistoryEventBatch, 0, len(eList))
for _, v := range eList {
eventBatch := createSerializedHistoryEventBatch(v)
bufferedEvents = append(bufferedEvents, eventBatch)
}
state.BufferedEvents = bufferedEvents

return &GetWorkflowExecutionResponse{State: state}, nil
}

Expand Down Expand Up @@ -1005,6 +1035,9 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
d.updateRequestCancelInfos(batch, request.UpsertRequestCancelInfos, request.DeleteRequestCancelInfo,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

d.updateBufferedEvents(batch, request.NewBufferedEvents, request.ClearBufferedEvents,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

if request.ContinueAsNew != nil {
startReq := request.ContinueAsNew
d.CreateWorkflowExecutionWithinBatch(startReq, batch, cqlNowTimestamp)
Expand Down Expand Up @@ -1837,6 +1870,38 @@ func (d *cassandraPersistence) updateRequestCancelInfos(batch *gocql.Batch, requ
}
}

func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBufferedEvents *SerializedHistoryEventBatch,
clearBufferedEvents bool, domainID, workflowID, runID string, condition int64, rangeID int64) {

if clearBufferedEvents {
batch.Query(templateDeleteBufferedEventsQuery,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
} else if newBufferedEvents != nil {
values := make(map[string]interface{})
values["encoding_type"] = newBufferedEvents.EncodingType
values["version"] = newBufferedEvents.Version
values["data"] = newBufferedEvents.Data
newEventValues := []map[string]interface{}{values}
batch.Query(templateAppendBufferedEventsQuery,
newEventValues,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}
}

func createShardInfo(result map[string]interface{}) *ShardInfo {
info := &ShardInfo{}
for k, v := range result {
Expand Down Expand Up @@ -2048,6 +2113,21 @@ func createRequestCancelInfo(result map[string]interface{}) *RequestCancelInfo {
return info
}

func createSerializedHistoryEventBatch(result map[string]interface{}) *SerializedHistoryEventBatch {
// TODO: default to JSON, update this when we support different encoding types.
eventBatch := &SerializedHistoryEventBatch{EncodingType: common.EncodingTypeJSON}
for k, v := range result {
switch k {
case "version":
eventBatch.Version = v.(int)
case "data":
eventBatch.Data = v.([]byte)
}
}

return eventBatch
}

func createTaskInfo(result map[string]interface{}) *TaskInfo {
info := &TaskInfo{}
for k, v := range result {
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type (
ChildExecutionInfos map[int64]*ChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
ExecutionInfo *WorkflowExecutionInfo
BufferedEvents []*SerializedHistoryEventBatch
}

// ActivityInfo details.
Expand Down Expand Up @@ -409,6 +410,8 @@ type (
DeleteChildExecutionInfo *int64
UpsertRequestCancelInfos []*RequestCancelInfo
DeleteRequestCancelInfo *int64
NewBufferedEvents *SerializedHistoryEventBatch
ClearBufferedEvents bool
}

// DeleteWorkflowExecutionRequest is used to delete a workflow execution
Expand Down
5 changes: 1 addition & 4 deletions common/service/config/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ func (rpConfig *Ringpop) validate() error {
if len(rpConfig.Name) == 0 {
return fmt.Errorf("ringpop config missing `name` param")
}
if err := validateBootstrapMode(rpConfig); err != nil {
return err
}
return nil
return validateBootstrapMode(rpConfig)
}

// UnmarshalYAML is called by the yaml package to convert
Expand Down
Loading

0 comments on commit d79841b

Please sign in to comment.