Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer events when decision task is inflight #386

Merged
merged 3 commits into from
Oct 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/logging/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
HistoryEngineShutdown = 2003
PersistentStoreErrorEventID = 2010
HistorySerializationErrorEventID = 2020
HistoryDeserializationErrorEventID = 2021
DuplicateTaskEventID = 2030
MultipleCompletionDecisionsEventID = 2040
DuplicateTransferTaskEventID = 2050
Expand Down
8 changes: 8 additions & 0 deletions common/logging/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func LogHistorySerializationErrorEvent(logger bark.Logger, err error, msg string
}).Errorf("Error serializing workflow execution history. Msg: %v", msg)
}

// LogHistoryDeserializationErrorEvent is used to log errors deserializing execution history
func LogHistoryDeserializationErrorEvent(logger bark.Logger, err error, msg string) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: HistoryDeserializationErrorEventID,
TagWorkflowErr: err,
}).Errorf("Error deserializing workflow execution history. Msg: %v", msg)
}

// LogHistoryEngineStartingEvent is used to log history engine starting
func LogHistoryEngineStartingEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
Expand Down
31 changes: 16 additions & 15 deletions common/metrics/tally/statsd/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@ package statsd

import (
"bytes"
"sort"
"time"

"github.com/cactus/go-statsd-client/statsd"
"github.com/uber-go/tally"
tallystatsdreporter "github.com/uber-go/tally/statsd"
"sort"
"time"
)

type cadenceTallyStatsdReporter struct {
//Wrapper on top of "github.com/uber-go/tally/statsd"
tallystatsd tally.StatsReporter
}

func (r *cadenceTallyStatsdReporter) metricNameWithTags(original_name string, tags map[string]string) string {
func (r *cadenceTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string {
var keys []string
for k := range tags {
keys = append(keys, k)
}
sort.Strings(keys)

var buffer bytes.Buffer
buffer.WriteString(original_name)
buffer.WriteString(originalName)

for _, tk := range keys {
// adding "." as delimiter so that it will show as different parts in Graphite/Grafana
Expand All @@ -52,7 +53,7 @@ func (r *cadenceTallyStatsdReporter) metricNameWithTags(original_name string, ta
return buffer.String()
}

// This is a wrapper on top of "github.com/uber-go/tally/statsd"
// NewReporter is a wrapper on top of "github.com/uber-go/tally/statsd"
// The purpose is to support tagging
// The implementation is to append tags as metric name suffixes
func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter {
Expand All @@ -62,18 +63,18 @@ func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.
}

func (r *cadenceTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(new_name, map[string]string{}, value)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportCounter(newName, map[string]string{}, value)
}

func (r *cadenceTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(new_name, map[string]string{}, value)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportGauge(newName, map[string]string{}, value)
}

func (r *cadenceTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(new_name, map[string]string{}, interval)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportTimer(newName, map[string]string{}, interval)
}

func (r *cadenceTallyStatsdReporter) ReportHistogramValueSamples(
Expand All @@ -84,8 +85,8 @@ func (r *cadenceTallyStatsdReporter) ReportHistogramValueSamples(
bucketUpperBound float64,
samples int64,
) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramValueSamples(new_name, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramValueSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
}

func (r *cadenceTallyStatsdReporter) ReportHistogramDurationSamples(
Expand All @@ -96,8 +97,8 @@ func (r *cadenceTallyStatsdReporter) ReportHistogramDurationSamples(
bucketUpperBound time.Duration,
samples int64,
) {
new_name := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramDurationSamples(new_name, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
newName := r.metricNameWithTags(name, tags)
r.tallystatsd.ReportHistogramDurationSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples)
}

func (r *cadenceTallyStatsdReporter) Capabilities() tally.Capabilities {
Expand Down
82 changes: 81 additions & 1 deletion common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ const (
`and task_id = ? ` +
`IF range_id = ?`

templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map ` +
templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map, buffered_events_list ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -343,6 +343,28 @@ const (
`and task_id = ? ` +
`IF next_event_id = ?`

templateAppendBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = buffered_events_list + ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = [] ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteActivityInfoQuery = `DELETE activity_map[ ? ] ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -942,6 +964,14 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
}
state.RequestCancelInfos = requestCancelInfos

eList := result["buffered_events_list"].([]map[string]interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you refactor this logic in a helper method. For example look at 'createTimerTaskInfo'. We create a helper method to deserialize each of these custom types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic is already put in a help function createSerializedHistoryEventBatch().

bufferedEvents := make([]*SerializedHistoryEventBatch, 0, len(eList))
for _, v := range eList {
eventBatch := createSerializedHistoryEventBatch(v)
bufferedEvents = append(bufferedEvents, eventBatch)
}
state.BufferedEvents = bufferedEvents

return &GetWorkflowExecutionResponse{State: state}, nil
}

Expand Down Expand Up @@ -1005,6 +1035,9 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
d.updateRequestCancelInfos(batch, request.UpsertRequestCancelInfos, request.DeleteRequestCancelInfo,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

d.updateBufferedEvents(batch, request.NewBufferedEvents, request.ClearBufferedEvents,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

if request.ContinueAsNew != nil {
startReq := request.ContinueAsNew
d.CreateWorkflowExecutionWithinBatch(startReq, batch, cqlNowTimestamp)
Expand Down Expand Up @@ -1837,6 +1870,38 @@ func (d *cassandraPersistence) updateRequestCancelInfos(batch *gocql.Batch, requ
}
}

func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBufferedEvents *SerializedHistoryEventBatch,
clearBufferedEvents bool, domainID, workflowID, runID string, condition int64, rangeID int64) {

if clearBufferedEvents {
batch.Query(templateDeleteBufferedEventsQuery,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
} else if newBufferedEvents != nil {
values := make(map[string]interface{})
values["encoding_type"] = newBufferedEvents.EncodingType
values["version"] = newBufferedEvents.Version
values["data"] = newBufferedEvents.Data
newEventValues := []map[string]interface{}{values}
batch.Query(templateAppendBufferedEventsQuery,
newEventValues,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}
}

func createShardInfo(result map[string]interface{}) *ShardInfo {
info := &ShardInfo{}
for k, v := range result {
Expand Down Expand Up @@ -2048,6 +2113,21 @@ func createRequestCancelInfo(result map[string]interface{}) *RequestCancelInfo {
return info
}

func createSerializedHistoryEventBatch(result map[string]interface{}) *SerializedHistoryEventBatch {
// TODO: default to JSON, update this when we support different encoding types.
eventBatch := &SerializedHistoryEventBatch{EncodingType: common.EncodingTypeJSON}
for k, v := range result {
switch k {
case "version":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the default but why are you not reading the encoding_type?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the value is already set to JSON when create the struct in this method, so there is no need to read the encoding_type, unless we support other encoding type.

eventBatch.Version = v.(int)
case "data":
eventBatch.Data = v.([]byte)
}
}

return eventBatch
}

func createTaskInfo(result map[string]interface{}) *TaskInfo {
info := &TaskInfo{}
for k, v := range result {
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type (
ChildExecutionInfos map[int64]*ChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
ExecutionInfo *WorkflowExecutionInfo
BufferedEvents []*SerializedHistoryEventBatch
}

// ActivityInfo details.
Expand Down Expand Up @@ -409,6 +410,8 @@ type (
DeleteChildExecutionInfo *int64
UpsertRequestCancelInfos []*RequestCancelInfo
DeleteRequestCancelInfo *int64
NewBufferedEvents *SerializedHistoryEventBatch
ClearBufferedEvents bool
}

// DeleteWorkflowExecutionRequest is used to delete a workflow execution
Expand Down
5 changes: 1 addition & 4 deletions common/service/config/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ func (rpConfig *Ringpop) validate() error {
if len(rpConfig.Name) == 0 {
return fmt.Errorf("ringpop config missing `name` param")
}
if err := validateBootstrapMode(rpConfig); err != nil {
return err
}
return nil
return validateBootstrapMode(rpConfig)
}

// UnmarshalYAML is called by the yaml package to convert
Expand Down
Loading