diff --git a/common/logging/events.go b/common/logging/events.go index dd6f8660c2a..50ab2c5a00f 100644 --- a/common/logging/events.go +++ b/common/logging/events.go @@ -34,6 +34,7 @@ const ( HistoryEngineShutdown = 2003 PersistentStoreErrorEventID = 2010 HistorySerializationErrorEventID = 2020 + HistoryDeserializationErrorEventID = 2021 DuplicateTaskEventID = 2030 MultipleCompletionDecisionsEventID = 2040 DuplicateTransferTaskEventID = 2050 diff --git a/common/logging/helpers.go b/common/logging/helpers.go index 6d85b129ac1..9e98d17cc73 100644 --- a/common/logging/helpers.go +++ b/common/logging/helpers.go @@ -84,6 +84,14 @@ func LogHistorySerializationErrorEvent(logger bark.Logger, err error, msg string }).Errorf("Error serializing workflow execution history. Msg: %v", msg) } +// LogHistoryDeserializationErrorEvent is used to log errors deserializing execution history +func LogHistoryDeserializationErrorEvent(logger bark.Logger, err error, msg string) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: HistoryDeserializationErrorEventID, + TagWorkflowErr: err, + }).Errorf("Error deserializing workflow execution history. Msg: %v", msg) +} + // LogHistoryEngineStartingEvent is used to log history engine starting func LogHistoryEngineStartingEvent(logger bark.Logger) { logger.WithFields(bark.Fields{ diff --git a/common/metrics/tally/statsd/reporter.go b/common/metrics/tally/statsd/reporter.go index 612269f414c..a0b34a593fc 100644 --- a/common/metrics/tally/statsd/reporter.go +++ b/common/metrics/tally/statsd/reporter.go @@ -22,11 +22,12 @@ package statsd import ( "bytes" + "sort" + "time" + "github.com/cactus/go-statsd-client/statsd" "github.com/uber-go/tally" tallystatsdreporter "github.com/uber-go/tally/statsd" - "sort" - "time" ) type cadenceTallyStatsdReporter struct { @@ -34,7 +35,7 @@ type cadenceTallyStatsdReporter struct { tallystatsd tally.StatsReporter } -func (r *cadenceTallyStatsdReporter) metricNameWithTags(original_name string, tags map[string]string) string { +func (r *cadenceTallyStatsdReporter) metricNameWithTags(originalName string, tags map[string]string) string { var keys []string for k := range tags { keys = append(keys, k) @@ -42,7 +43,7 @@ func (r *cadenceTallyStatsdReporter) metricNameWithTags(original_name string, ta sort.Strings(keys) var buffer bytes.Buffer - buffer.WriteString(original_name) + buffer.WriteString(originalName) for _, tk := range keys { // adding "." as delimiter so that it will show as different parts in Graphite/Grafana @@ -52,7 +53,7 @@ func (r *cadenceTallyStatsdReporter) metricNameWithTags(original_name string, ta return buffer.String() } -// This is a wrapper on top of "github.com/uber-go/tally/statsd" +// NewReporter is a wrapper on top of "github.com/uber-go/tally/statsd" // The purpose is to support tagging // The implementation is to append tags as metric name suffixes func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally.StatsReporter { @@ -62,18 +63,18 @@ func NewReporter(statsd statsd.Statter, opts tallystatsdreporter.Options) tally. } func (r *cadenceTallyStatsdReporter) ReportCounter(name string, tags map[string]string, value int64) { - new_name := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportCounter(new_name, map[string]string{}, value) + newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportCounter(newName, map[string]string{}, value) } func (r *cadenceTallyStatsdReporter) ReportGauge(name string, tags map[string]string, value float64) { - new_name := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportGauge(new_name, map[string]string{}, value) + newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportGauge(newName, map[string]string{}, value) } func (r *cadenceTallyStatsdReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) { - new_name := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportTimer(new_name, map[string]string{}, interval) + newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportTimer(newName, map[string]string{}, interval) } func (r *cadenceTallyStatsdReporter) ReportHistogramValueSamples( @@ -84,8 +85,8 @@ func (r *cadenceTallyStatsdReporter) ReportHistogramValueSamples( bucketUpperBound float64, samples int64, ) { - new_name := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportHistogramValueSamples(new_name, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) + newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportHistogramValueSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) } func (r *cadenceTallyStatsdReporter) ReportHistogramDurationSamples( @@ -96,8 +97,8 @@ func (r *cadenceTallyStatsdReporter) ReportHistogramDurationSamples( bucketUpperBound time.Duration, samples int64, ) { - new_name := r.metricNameWithTags(name, tags) - r.tallystatsd.ReportHistogramDurationSamples(new_name, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) + newName := r.metricNameWithTags(name, tags) + r.tallystatsd.ReportHistogramDurationSamples(newName, map[string]string{}, buckets, bucketLowerBound, bucketUpperBound, samples) } func (r *cadenceTallyStatsdReporter) Capabilities() tally.Capabilities { diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index 8547da45d8b..80161c6fc50 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -268,7 +268,7 @@ const ( `and task_id = ? ` + `IF range_id = ?` - templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map ` + + templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map, buffered_events_list ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -343,6 +343,28 @@ const ( `and task_id = ? ` + `IF next_event_id = ?` + templateAppendBufferedEventsQuery = `UPDATE executions ` + + `SET buffered_events_list = buffered_events_list + ? ` + + `WHERE shard_id = ? ` + + `and type = ? ` + + `and domain_id = ? ` + + `and workflow_id = ? ` + + `and run_id = ? ` + + `and visibility_ts = ? ` + + `and task_id = ? ` + + `IF next_event_id = ?` + + templateDeleteBufferedEventsQuery = `UPDATE executions ` + + `SET buffered_events_list = [] ` + + `WHERE shard_id = ? ` + + `and type = ? ` + + `and domain_id = ? ` + + `and workflow_id = ? ` + + `and run_id = ? ` + + `and visibility_ts = ? ` + + `and task_id = ? ` + + `IF next_event_id = ?` + templateDeleteActivityInfoQuery = `DELETE activity_map[ ? ] ` + `FROM executions ` + `WHERE shard_id = ? ` + @@ -942,6 +964,14 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio } state.RequestCancelInfos = requestCancelInfos + eList := result["buffered_events_list"].([]map[string]interface{}) + bufferedEvents := make([]*SerializedHistoryEventBatch, 0, len(eList)) + for _, v := range eList { + eventBatch := createSerializedHistoryEventBatch(v) + bufferedEvents = append(bufferedEvents, eventBatch) + } + state.BufferedEvents = bufferedEvents + return &GetWorkflowExecutionResponse{State: state}, nil } @@ -1005,6 +1035,9 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx d.updateRequestCancelInfos(batch, request.UpsertRequestCancelInfos, request.DeleteRequestCancelInfo, executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID) + d.updateBufferedEvents(batch, request.NewBufferedEvents, request.ClearBufferedEvents, + executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID) + if request.ContinueAsNew != nil { startReq := request.ContinueAsNew d.CreateWorkflowExecutionWithinBatch(startReq, batch, cqlNowTimestamp) @@ -1837,6 +1870,38 @@ func (d *cassandraPersistence) updateRequestCancelInfos(batch *gocql.Batch, requ } } +func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBufferedEvents *SerializedHistoryEventBatch, + clearBufferedEvents bool, domainID, workflowID, runID string, condition int64, rangeID int64) { + + if clearBufferedEvents { + batch.Query(templateDeleteBufferedEventsQuery, + d.shardID, + rowTypeExecution, + domainID, + workflowID, + runID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + condition) + } else if newBufferedEvents != nil { + values := make(map[string]interface{}) + values["encoding_type"] = newBufferedEvents.EncodingType + values["version"] = newBufferedEvents.Version + values["data"] = newBufferedEvents.Data + newEventValues := []map[string]interface{}{values} + batch.Query(templateAppendBufferedEventsQuery, + newEventValues, + d.shardID, + rowTypeExecution, + domainID, + workflowID, + runID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + condition) + } +} + func createShardInfo(result map[string]interface{}) *ShardInfo { info := &ShardInfo{} for k, v := range result { @@ -2048,6 +2113,21 @@ func createRequestCancelInfo(result map[string]interface{}) *RequestCancelInfo { return info } +func createSerializedHistoryEventBatch(result map[string]interface{}) *SerializedHistoryEventBatch { + // TODO: default to JSON, update this when we support different encoding types. + eventBatch := &SerializedHistoryEventBatch{EncodingType: common.EncodingTypeJSON} + for k, v := range result { + switch k { + case "version": + eventBatch.Version = v.(int) + case "data": + eventBatch.Data = v.([]byte) + } + } + + return eventBatch +} + func createTaskInfo(result map[string]interface{}) *TaskInfo { info := &TaskInfo{} for k, v := range result { diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 5b4541a9431..22926dce7a8 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -271,6 +271,7 @@ type ( ChildExecutionInfos map[int64]*ChildExecutionInfo RequestCancelInfos map[int64]*RequestCancelInfo ExecutionInfo *WorkflowExecutionInfo + BufferedEvents []*SerializedHistoryEventBatch } // ActivityInfo details. @@ -409,6 +410,8 @@ type ( DeleteChildExecutionInfo *int64 UpsertRequestCancelInfos []*RequestCancelInfo DeleteRequestCancelInfo *int64 + NewBufferedEvents *SerializedHistoryEventBatch + ClearBufferedEvents bool } // DeleteWorkflowExecutionRequest is used to delete a workflow execution diff --git a/common/service/config/ringpop.go b/common/service/config/ringpop.go index 6f00e94532b..51b23e20ac3 100644 --- a/common/service/config/ringpop.go +++ b/common/service/config/ringpop.go @@ -67,10 +67,7 @@ func (rpConfig *Ringpop) validate() error { if len(rpConfig.Name) == 0 { return fmt.Errorf("ringpop config missing `name` param") } - if err := validateBootstrapMode(rpConfig); err != nil { - return err - } - return nil + return validateBootstrapMode(rpConfig) } // UnmarshalYAML is called by the yaml package to convert diff --git a/host/integration_test.go b/host/integration_test.go index 745eef69606..5f6f1f5afbd 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -21,10 +21,14 @@ package host import ( + "bytes" "context" + "encoding/binary" + "errors" "flag" "fmt" "os" + "strconv" "testing" "time" @@ -34,12 +38,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/uber-common/bark" - "bytes" - "encoding/binary" - "strconv" - - "errors" - wsc "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/client/frontend" @@ -1382,6 +1380,125 @@ func (s *integrationSuite) TestSignalWorkflow() { s.IsType(&workflow.EntityNotExistsError{}, err) } +func (s *integrationSuite) TestBufferedEvents() { + id := "interation-buffered-events-test" + wt := "interation-buffered-events-test-type" + tl := "interation-buffered-events-test-tasklist" + identity := "worker1" + signalName := "buffered-signal" + + workflowType := &workflow.WorkflowType{Name: &wt} + taskList := &workflow.TaskList{Name: &tl} + + // Start workflow execution + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: common.StringPtr(identity), + } + + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + + s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) + + // decider logic + workflowComplete := false + signalSent := false + var signalEvent *workflow.HistoryEvent + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision) { + if !signalSent { + signalSent = true + + // this will create new event when there is in-flight decision task, and the new event will be buffered + err := s.engine.SignalWorkflowExecution(createContext(), + &workflow.SignalWorkflowExecutionRequest{ + Domain: common.StringPtr(s.domainName), + WorkflowExecution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + }, + SignalName: common.StringPtr("buffered-signal"), + Input: []byte("buffered-signal-input"), + Identity: common.StringPtr(identity), + }) + s.NoError(err) + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("1"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr("test-activity-type")}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("test-input"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(2), + StartToCloseTimeoutSeconds: common.Int32Ptr(50), + HeartbeatTimeoutSeconds: common.Int32Ptr(5), + }, + }} + } else if previousStartedEventID > 0 && signalEvent == nil { + for _, event := range history.Events[previousStartedEventID:] { + if *event.EventType == workflow.EventTypeWorkflowExecutionSignaled { + signalEvent = event + } + } + } + + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }} + } + + poller := &taskPoller{ + engine: s.engine, + domain: s.domainName, + taskList: taskList, + identity: identity, + decisionHandler: dtHandler, + activityHandler: nil, + logger: s.logger, + } + + // first decision, which sends signal and the signal event should be buffered to append after first decision closed + err := poller.pollAndProcessDecisionTask(false, false) + s.logger.Infof("pollAndProcessDecisionTask: %v", err) + s.Nil(err) + + // check history, the signal event should be after the complete decision task + histResp, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{ + Domain: common.StringPtr(s.domainName), + Execution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: we.RunId, + }, + }) + s.NoError(err) + s.NotNil(histResp.History.Events) + s.True(len(histResp.History.Events) >= 6) + s.Equal(histResp.History.Events[3].GetEventType(), workflow.EventTypeDecisionTaskCompleted) + s.Equal(histResp.History.Events[4].GetEventType(), workflow.EventTypeActivityTaskScheduled) + s.Equal(histResp.History.Events[5].GetEventType(), workflow.EventTypeWorkflowExecutionSignaled) + + // Process signal in decider + err = poller.pollAndProcessDecisionTask(true, false) + s.logger.Infof("pollAndProcessDecisionTask: %v", err) + s.Nil(err) + s.NotNil(signalEvent) + s.Equal(signalName, *signalEvent.WorkflowExecutionSignaledEventAttributes.SignalName) + s.Equal(identity, *signalEvent.WorkflowExecutionSignaledEventAttributes.Identity) + s.True(workflowComplete) +} + func (s *integrationSuite) TestQueryWorkflow() { id := "interation-query-workflow-test" wt := "interation-query-workflow-test-type" @@ -2398,7 +2515,6 @@ func (s *integrationSuite) TestChildWorkflowExecution() { workflowComplete := false childComplete := false childExecutionStarted := false - childData := int32(1) var startedEvent *workflow.HistoryEvent var completedEvent *workflow.HistoryEvent dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, @@ -2421,8 +2537,6 @@ func (s *integrationSuite) TestChildWorkflowExecution() { if !childExecutionStarted { s.logger.Info("Starting child execution.") childExecutionStarted = true - buf := new(bytes.Buffer) - s.Nil(binary.Write(buf, binary.LittleEndian, childData)) return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartChildWorkflowExecution), @@ -2431,7 +2545,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() { WorkflowId: common.StringPtr(childID), WorkflowType: childWorkflowType, TaskList: taskList, - Input: buf.Bytes(), + Input: []byte("child-workflow-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(200), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyTerminate), @@ -2477,16 +2591,15 @@ func (s *integrationSuite) TestChildWorkflowExecution() { s.Nil(err) s.True(childExecutionStarted) - // Process ChildExecution Started event + // Process ChildExecution Started event and Process Child Execution and complete it err = poller.pollAndProcessDecisionTask(false, false) s.logger.Infof("pollAndProcessDecisionTask: %v", err) s.Nil(err) - s.NotNil(startedEvent) - - // Process Child Execution and complete it err = poller.pollAndProcessDecisionTask(false, false) s.logger.Infof("pollAndProcessDecisionTask: %v", err) s.Nil(err) + + s.NotNil(startedEvent) s.True(childComplete) // Process ChildExecution completed event and complete parent execution diff --git a/schema/cadence/schema.cql b/schema/cadence/schema.cql index 9487d7aa0a8..67a14346a1e 100644 --- a/schema/cadence/schema.cql +++ b/schema/cadence/schema.cql @@ -138,6 +138,12 @@ CREATE TYPE domain_config ( emit_metric boolean ); +CREATE TYPE serialized_event_batch ( + encoding_type text, + version int, + data blob, +); + CREATE TABLE executions ( shard_id int, type int, -- enum RowType { Shard, Execution, TransferTask, TimerTask} @@ -157,6 +163,7 @@ CREATE TABLE executions ( timer_map map>, child_executions_map map>, request_cancel_map map>, + buffered_events_list list>, PRIMARY KEY (shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id) ) WITH COMPACTION = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' diff --git a/schema/cadence/versioned/v0.2/add_buffered_events.cql b/schema/cadence/versioned/v0.2/add_buffered_events.cql new file mode 100644 index 00000000000..e4b75eb40ce --- /dev/null +++ b/schema/cadence/versioned/v0.2/add_buffered_events.cql @@ -0,0 +1,7 @@ +CREATE TYPE serialized_event_batch ( + encoding_type text, + version int, + data blob, +); + +ALTER TABLE executions ADD buffered_events_list list>; diff --git a/schema/cadence/versioned/v0.2/manifest.json b/schema/cadence/versioned/v0.2/manifest.json index 424e721dd2e..6f36b3af556 100644 --- a/schema/cadence/versioned/v0.2/manifest.json +++ b/schema/cadence/versioned/v0.2/manifest.json @@ -1,8 +1,9 @@ { "CurrVersion": "0.2", "MinCompatibleVersion": "0.2", - "Description": "add workflow timeout to mutable state", + "Description": "add workflow_timeout and buffered_events_list to mutable state", "SchemaUpdateCqlFiles": [ - "add_wf_timeout.cql" + "add_wf_timeout.cql", + "add_buffered_events.cql" ] } diff --git a/service/history/historyBuilder.go b/service/history/historyBuilder.go index fd0afa91a9c..260b4d628ec 100644 --- a/service/history/historyBuilder.go +++ b/service/history/historyBuilder.go @@ -30,8 +30,9 @@ import ( ) const ( - firstEventID int64 = 1 - emptyEventID int64 = -23 + firstEventID int64 = 1 + emptyEventID int64 = -23 + bufferedEventID int64 = -123 ) type ( diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 7fdd6cca4c1..6cc80cd7785 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -573,7 +573,7 @@ Update_History_Loop: var failCause workflow.DecisionTaskFailedCause var err error completedID := *completedEvent.EventId - hasUnhandledEvents := ((completedID - startedID) > 1) + hasUnhandledEvents := msBuilder.HasBufferedEvents() isComplete := false transferTasks := []persistence.Task{} timerTasks := []persistence.Task{} @@ -858,6 +858,9 @@ Update_History_Loop: continueAsNewBuilder = nil } + // flush event if needed after processing decisions + msBuilder.FlushBufferedEvents() + if tt := tBuilder.GetUserTimerTaskIfNeeded(msBuilder); tt != nil { timerTasks = append(timerTasks, tt) } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index d1223293e9c..97d325a17e4 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -32,7 +32,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-common/bark" - "github.com/uber-go/tally" "github.com/uber/cadence/.gen/go/history" workflow "github.com/uber/cadence/.gen/go/shared" @@ -398,10 +397,10 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedConflictOnUpdate() { s.Equal(context, ms2.ExecutionInfo.ExecutionContext) executionBuilder := s.getBuilder(domainID, we) - activity3Attributes := s.getActivityScheduledEvent(executionBuilder, 14).ActivityTaskScheduledEventAttributes + activity3Attributes := s.getActivityScheduledEvent(executionBuilder, 13).ActivityTaskScheduledEventAttributes s.Equal(activity3ID, *activity3Attributes.ActivityId) s.Equal(activity3Type, *activity3Attributes.ActivityType.Name) - s.Equal(int64(13), *activity3Attributes.DecisionTaskCompletedEventId) + s.Equal(int64(12), *activity3Attributes.DecisionTaskCompletedEventId) s.Equal(tl, *activity3Attributes.TaskList.Name) s.Equal(activity3Input, activity3Attributes.Input) s.Equal(int32(100), *activity3Attributes.ScheduleToCloseTimeoutSeconds) @@ -2444,6 +2443,8 @@ func addDecisionTaskCompletedEvent(builder *mutableStateBuilder, scheduleID, sta Identity: common.StringPtr(identity), }) + builder.FlushBufferedEvents() + return e } @@ -2550,10 +2551,20 @@ func createMutableState(builder *mutableStateBuilder) *persistence.WorkflowMutab for id, info := range builder.pendingTimerInfoIDs { timerInfos[id] = copyTimerInfo(info) } + builder.FlushBufferedEvents() + var bufferedEvents []*persistence.SerializedHistoryEventBatch + if len(builder.bufferedEvents) > 0 { + bufferedEvents = append(bufferedEvents, builder.bufferedEvents...) + } + if builder.updateBufferedEvents != nil { + bufferedEvents = append(bufferedEvents, builder.updateBufferedEvents) + } + return &persistence.WorkflowMutableState{ - ExecutionInfo: info, - ActivitInfos: activityInfos, - TimerInfos: timerInfos, + ExecutionInfo: info, + ActivitInfos: activityInfos, + TimerInfos: timerInfos, + BufferedEvents: bufferedEvents, } } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 57a9177beb7..59461e3b816 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -58,6 +58,10 @@ type ( updateRequestCancelInfos []*persistence.RequestCancelInfo // Modified RequestCancel Infos since last update deleteRequestCancelInfo *int64 // Deleted RequestCancel Info since last update + bufferedEvents []*persistence.SerializedHistoryEventBatch // buffered history events that are already persisted + updateBufferedEvents *persistence.SerializedHistoryEventBatch // buffered history events that needs to be persisted + clearBufferedEvents bool // delete buffered events from persistence + executionInfo *persistence.WorkflowExecutionInfo // Workflow mutable state info. continueAsNew *persistence.CreateWorkflowExecutionRequest hBuilder *historyBuilder @@ -75,6 +79,8 @@ type ( updateChildExecutionInfos []*persistence.ChildExecutionInfo deleteChildExecutionInfo *int64 continueAsNew *persistence.CreateWorkflowExecutionRequest + newBufferedEvents *persistence.SerializedHistoryEventBatch + clearBufferedEvents bool } // TODO: This should be part of persistence layer @@ -119,12 +125,95 @@ func (e *mutableStateBuilder) Load(state *persistence.WorkflowMutableState) { e.pendingChildExecutionInfoIDs = state.ChildExecutionInfos e.pendingRequestCancelInfoIDs = state.RequestCancelInfos e.executionInfo = state.ExecutionInfo + e.bufferedEvents = state.BufferedEvents for _, ai := range state.ActivitInfos { e.pendingActivityInfoByActivityID[ai.ActivityID] = ai.ScheduleID } } -func (e *mutableStateBuilder) CloseUpdateSession() *mutableStateSessionUpdates { +func (e *mutableStateBuilder) FlushBufferedEvents() error { + // put new events into 2 buckets: + // 1) if the event was added while there was in-flight decision, then put it in buffered bucket + // 2) otherwise, put it in committed bucket + var newBufferedEvents []*workflow.HistoryEvent + var newCommittedEvents []*workflow.HistoryEvent + for _, event := range e.hBuilder.history { + if event.GetEventId() == bufferedEventID { + newBufferedEvents = append(newBufferedEvents, event) + } else { + newCommittedEvents = append(newCommittedEvents, event) + } + } + + // no decision in-flight, flush all buffered events to committed bucket + if !e.HasInFlightDecisionTask() { + flush := func(bufferedEventBatch *persistence.SerializedHistoryEventBatch) error { + // TODO: get serializer based on eventBatch's EncodingType when we support multiple encoding + eventBatch, err := e.hBuilder.serializer.Deserialize(bufferedEventBatch) + if err != nil { + logging.LogHistoryDeserializationErrorEvent(e.logger, err, "Unable to serialize execution history for update.") + return err + } + for _, event := range eventBatch.Events { + newCommittedEvents = append(newCommittedEvents, event) + } + return nil + } + + // flush persisted buffered events + for _, bufferedEventBatch := range e.bufferedEvents { + if err := flush(bufferedEventBatch); err != nil { + return err + } + } + // flush pending buffered events + if e.updateBufferedEvents != nil { + if err := flush(e.updateBufferedEvents); err != nil { + return err + } + } + + // flush new buffered events that were not saved to persistence yet + newCommittedEvents = append(newCommittedEvents, newBufferedEvents...) + newBufferedEvents = nil + + // remove the persisted buffered events from persistence if there is any + e.clearBufferedEvents = e.clearBufferedEvents || len(e.bufferedEvents) > 0 + e.bufferedEvents = nil + // clear pending buffered events + e.updateBufferedEvents = nil + } + + // make sure all new committed events have correct EventID + for _, event := range newCommittedEvents { + if event.GetEventId() == bufferedEventID { + eventID := e.executionInfo.NextEventID + event.EventId = common.Int64Ptr(eventID) + e.executionInfo.NextEventID++ + } + } + e.hBuilder.history = newCommittedEvents + + // if decision is not closed yet, and there are new buffered events, then put those to the pending buffer + if e.HasInFlightDecisionTask() && len(newBufferedEvents) > 0 { + // decision in-flight, and some new events needs to be buffered + bufferedBatch := persistence.NewHistoryEventBatch(persistence.GetDefaultHistoryVersion(), newBufferedEvents) + serializedEvents, err := e.hBuilder.serializer.Serialize(bufferedBatch) + if err != nil { + logging.LogHistorySerializationErrorEvent(e.logger, err, "Unable to serialize execution history for update.") + return err + } + e.updateBufferedEvents = serializedEvents + } + + return nil +} + +func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates, error) { + if err := e.FlushBufferedEvents(); err != nil { + return nil, err + } + updates := &mutableStateSessionUpdates{ newEventsBuilder: e.hBuilder, updateActivityInfos: e.updateActivityInfos, @@ -134,6 +223,8 @@ func (e *mutableStateBuilder) CloseUpdateSession() *mutableStateSessionUpdates { updateChildExecutionInfos: e.updateChildExecutionInfos, deleteChildExecutionInfo: e.deleteChildExecutionInfo, continueAsNew: e.continueAsNew, + newBufferedEvents: e.updateBufferedEvents, + clearBufferedEvents: e.clearBufferedEvents, } // Clear all updates to prepare for the next session @@ -147,19 +238,32 @@ func (e *mutableStateBuilder) CloseUpdateSession() *mutableStateSessionUpdates { e.updateRequestCancelInfos = []*persistence.RequestCancelInfo{} e.deleteRequestCancelInfo = nil e.continueAsNew = nil + if e.updateBufferedEvents != nil { + e.bufferedEvents = append(e.bufferedEvents, e.updateBufferedEvents) + e.updateBufferedEvents = nil + } - return updates + return updates, nil } func (e *mutableStateBuilder) createNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent { eventID := e.executionInfo.NextEventID + if e.HasInFlightDecisionTask() && + eventType != workflow.EventTypeDecisionTaskCompleted && + eventType != workflow.EventTypeDecisionTaskFailed && + eventType != workflow.EventTypeDecisionTaskTimedOut { + eventID = bufferedEventID + } else { + // only increase NextEventID if there is no in-flight decision task + e.executionInfo.NextEventID++ + } + ts := common.Int64Ptr(time.Now().UnixNano()) historyEvent := &workflow.HistoryEvent{} historyEvent.EventId = common.Int64Ptr(eventID) historyEvent.Timestamp = ts historyEvent.EventType = common.EventTypePtr(eventType) - e.executionInfo.NextEventID++ return historyEvent } @@ -390,6 +494,24 @@ func (e *mutableStateBuilder) HasPendingDecisionTask() bool { return e.executionInfo.DecisionScheduleID != emptyEventID } +func (e *mutableStateBuilder) HasInFlightDecisionTask() bool { + return e.executionInfo.DecisionStartedID > 0 +} + +func (e *mutableStateBuilder) HasBufferedEvents() bool { + if len(e.bufferedEvents) > 0 || e.updateBufferedEvents != nil { + return true + } + + for _, event := range e.hBuilder.history { + if event.GetEventId() == bufferedEventID { + return true + } + } + + return false +} + // UpdateDecision updates a decision task. func (e *mutableStateBuilder) UpdateDecision(di *decisionInfo) { e.executionInfo.DecisionScheduleID = di.ScheduleID diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index 4eb38e3417f..6ad56288122 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -228,8 +228,9 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() { s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once() s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once() - s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Run(func(arguments mock.Arguments) { + request := arguments.Get(0).(*persistence.UpdateWorkflowExecutionRequest) + s.NotNil(request.NewBufferedEvents) // Done. waitCh <- struct{}{} }).Once() diff --git a/service/history/timerQueueProcessor_test.go b/service/history/timerQueueProcessor_test.go index 707e783e990..a63caa0567e 100644 --- a/service/history/timerQueueProcessor_test.go +++ b/service/history/timerQueueProcessor_test.go @@ -25,18 +25,17 @@ import ( "testing" "time" - "github.com/uber-go/tally" - "github.com/uber/cadence/common" - "github.com/uber/cadence/common/metrics" - "github.com/uber/cadence/common/mocks" - "github.com/uber/cadence/common/persistence" - "github.com/pborman/uuid" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" "github.com/uber-common/bark" + "github.com/uber-go/tally" workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" ) type ( @@ -173,6 +172,7 @@ func (s *timerQueueProcessorSuite) addDecisionTimer(domainID string, we workflow timerTasks := []persistence.Task{timeOutTask} s.updateTimerSeqNumbers(timerTasks) + err2 := s.UpdateWorkflowExecution(state.ExecutionInfo, nil, nil, condition, timerTasks, nil, nil, nil, nil, nil) s.Nil(err2, "No error expected.") return timerTasks diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index d8c156b6e11..bc3adaa2c8a 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -112,9 +112,20 @@ func (c *workflowExecutionContext) updateWorkflowExecutionWithDeleteTask(transfe } func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persistence.Task, - timerTasks []persistence.Task, transactionID int64) error { + timerTasks []persistence.Task, transactionID int64) (errRet error) { + + defer func() { + if errRet != nil { + // Clear all cached state in case of error + c.clear() + } + }() + // Take a snapshot of all updates we have accumulated for this execution - updates := c.msBuilder.CloseUpdateSession() + updates, err := c.msBuilder.CloseUpdateSession() + if err != nil { + return err + } builder := updates.newEventsBuilder if builder.history != nil && len(builder.history) > 0 { @@ -133,9 +144,6 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi FirstEventID: *firstEvent.EventId, Events: serializedHistory, }); err0 != nil { - // Clear all cached state in case of error - c.clear() - switch err0.(type) { case *persistence.ConditionFailedError: return ErrConflict @@ -145,7 +153,6 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi fmt.Sprintf("{updateCondition: %v}", c.updateCondition)) return err0 } - } continueAsNew := updates.continueAsNew @@ -167,12 +174,11 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi DeleteTimerInfos: updates.deleteTimerInfos, UpsertChildExecutionInfos: updates.updateChildExecutionInfos, DeleteChildExecutionInfo: updates.deleteChildExecutionInfo, + NewBufferedEvents: updates.newBufferedEvents, + ClearBufferedEvents: updates.clearBufferedEvents, ContinueAsNew: continueAsNew, CloseExecution: deleteExecution, }); err1 != nil { - // Clear all cached state in case of error - c.clear() - switch err1.(type) { case *persistence.ConditionFailedError: return ErrConflict diff --git a/tools/cassandra/cqlclient.go b/tools/cassandra/cqlclient.go index f9db430152d..539c8468fd0 100644 --- a/tools/cassandra/cqlclient.go +++ b/tools/cassandra/cqlclient.go @@ -23,14 +23,14 @@ package cassandra import ( "bufio" "errors" + "fmt" "io" + "log" "os" "strings" "time" - "fmt" "github.com/gocql/gocql" - "log" ) type ( @@ -189,10 +189,7 @@ func (client *cqlClient) CreateSchemaVersionTables() error { if err := client.Exec(createSchemaVersionTableCQL); err != nil { return err } - if err := client.Exec(createSchemaUpdateHistoryTableCQL); err != nil { - return err - } - return nil + return client.Exec(createSchemaUpdateHistoryTableCQL) } // ReadSchemaVersion returns the current schema version for the keyspace