Skip to content

Commit

Permalink
buffer events when decision task is inflight
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc-zz committed Oct 21, 2017
1 parent cbc769d commit a44aeeb
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 39 deletions.
1 change: 1 addition & 0 deletions common/logging/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
HistoryEngineShutdown = 2003
PersistentStoreErrorEventID = 2010
HistorySerializationErrorEventID = 2020
HistoryDeserializationErrorEventID = 2021
DuplicateTaskEventID = 2030
MultipleCompletionDecisionsEventID = 2040
DuplicateTransferTaskEventID = 2050
Expand Down
8 changes: 8 additions & 0 deletions common/logging/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func LogHistorySerializationErrorEvent(logger bark.Logger, err error, msg string
}).Errorf("Error serializing workflow execution history. Msg: %v", msg)
}

// LogHistoryDeserializationErrorEvent is used to log errors deserializing execution history
func LogHistoryDeserializationErrorEvent(logger bark.Logger, err error, msg string) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: HistoryDeserializationErrorEventID,
TagWorkflowErr: err,
}).Errorf("Error deserializing workflow execution history. Msg: %v", msg)
}

// LogHistoryEngineStartingEvent is used to log history engine starting
func LogHistoryEngineStartingEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
Expand Down
87 changes: 86 additions & 1 deletion common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ const (
`and task_id = ? ` +
`IF range_id = ?`

templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map ` +
templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map, buffered_events_list ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -343,6 +343,28 @@ const (
`and task_id = ? ` +
`IF next_event_id = ?`

templateAppendBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = buffered_events_list + ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = [] ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteActivityInfoQuery = `DELETE activity_map[ ? ] ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -942,6 +964,14 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
}
state.RequestCancelInfos = requestCancelInfos

bufferedEvents := []*SerializedHistoryEventBatch{}
eList := result["buffered_events_list"].([]map[string]interface{})
for _, v := range eList {
eventBatch := createSerializedHistoryEventBatch(v)
bufferedEvents = append(bufferedEvents, eventBatch)
}
state.BufferedEvents = bufferedEvents

return &GetWorkflowExecutionResponse{State: state}, nil
}

Expand Down Expand Up @@ -1005,6 +1035,9 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
d.updateRequestCancelInfos(batch, request.UpsertRequestCancelInfos, request.DeleteRequestCancelInfo,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

d.updateBufferedEvents(batch, request.NewBufferedEvents, request.ClearBufferedEvents,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

if request.ContinueAsNew != nil {
startReq := request.ContinueAsNew
d.CreateWorkflowExecutionWithinBatch(startReq, batch, cqlNowTimestamp)
Expand Down Expand Up @@ -1837,6 +1870,43 @@ func (d *cassandraPersistence) updateRequestCancelInfos(batch *gocql.Batch, requ
}
}

func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBufferedEvents []*SerializedHistoryEventBatch,
clearBufferedEvents bool, domainID, workflowID, runID string, condition int64, rangeID int64) {

if clearBufferedEvents {
batch.Query(templateDeleteBufferedEventsQuery,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}

if len(newBufferedEvents) > 0 {
var newEventValues []map[string]interface{}
for _, eventBatch := range newBufferedEvents {
values := make(map[string]interface{})
values["encoding_type"] = eventBatch.EncodingType
values["version"] = eventBatch.Version
values["data"] = eventBatch.Data
newEventValues = append(newEventValues, values)
}
batch.Query(templateAppendBufferedEventsQuery,
newEventValues,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}
}

func createShardInfo(result map[string]interface{}) *ShardInfo {
info := &ShardInfo{}
for k, v := range result {
Expand Down Expand Up @@ -2048,6 +2118,21 @@ func createRequestCancelInfo(result map[string]interface{}) *RequestCancelInfo {
return info
}

func createSerializedHistoryEventBatch(result map[string]interface{}) *SerializedHistoryEventBatch {
// TODO: default to JSON, update this when we support different encoding types.
eventBatch := &SerializedHistoryEventBatch{EncodingType: common.EncodingTypeJSON}
for k, v := range result {
switch k {
case "version":
eventBatch.Version = v.(int)
case "data":
eventBatch.Data = v.([]byte)
}
}

return eventBatch
}

func createTaskInfo(result map[string]interface{}) *TaskInfo {
info := &TaskInfo{}
for k, v := range result {
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type (
ChildExecutionInfos map[int64]*ChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
ExecutionInfo *WorkflowExecutionInfo
BufferedEvents []*SerializedHistoryEventBatch
}

// ActivityInfo details.
Expand Down Expand Up @@ -409,6 +410,8 @@ type (
DeleteChildExecutionInfo *int64
UpsertRequestCancelInfos []*RequestCancelInfo
DeleteRequestCancelInfo *int64
NewBufferedEvents []*SerializedHistoryEventBatch
ClearBufferedEvents bool
}

// DeleteWorkflowExecutionRequest is used to delete a workflow execution
Expand Down
122 changes: 112 additions & 10 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
package host

import (
"bytes"
"context"
"encoding/binary"
"errors"
"flag"
"fmt"
"os"
"strconv"
"testing"
"time"

Expand All @@ -34,12 +38,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"

"bytes"
"encoding/binary"
"strconv"

"errors"

wsc "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/frontend"
Expand Down Expand Up @@ -1382,6 +1380,113 @@ func (s *integrationSuite) TestSignalWorkflow() {
s.IsType(&workflow.EntityNotExistsError{}, err)
}

func (s *integrationSuite) TestBufferedEvents() {
id := "interation-buffered-events-test"
wt := "interation-buffered-events-test-type"
tl := "interation-buffered-events-test-tasklist"
identity := "worker1"
signalName := "buffered-signal"

workflowType := &workflow.WorkflowType{Name: &wt}
taskList := &workflow.TaskList{Name: &tl}

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

// decider logic
workflowComplete := false
signalSent := false
var signalEvent *workflow.HistoryEvent
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision) {
if !signalSent {
signalSent = true

// this will create new event when there is in-flight decision task, and the new event will be buffered
err := s.engine.SignalWorkflowExecution(createContext(),
&workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(s.domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
},
SignalName: common.StringPtr("buffered-signal"),
Input: []byte("buffered-signal-input"),
Identity: common.StringPtr(identity),
})
s.NoError(err)
// this will complete the decision task, and since there is buffered event, so a new decision will be scheduled
return nil, []*workflow.Decision{}
} else if previousStartedEventID > 0 && signalEvent == nil {
for _, event := range history.Events[previousStartedEventID:] {
if *event.EventType == workflow.EventTypeWorkflowExecutionSignaled {
signalEvent = event
}
}
}

workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: nil,
logger: s.logger,
}

// first decision, which sends signal and the signal event should be buffered to append after first decision closed
err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// check history, the signal event should be after the complete decision task
histResp, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(s.domainName),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: we.RunId,
},
})
s.NoError(err)
s.NotNil(histResp.History.Events)
s.True(len(histResp.History.Events) >= 5)
s.Equal(histResp.History.Events[3].GetEventType(), workflow.EventTypeDecisionTaskCompleted)
s.Equal(histResp.History.Events[4].GetEventType(), workflow.EventTypeWorkflowExecutionSignaled)

// Process signal in decider
err = poller.pollAndProcessDecisionTask(true, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.NotNil(signalEvent)
s.Equal(signalName, *signalEvent.WorkflowExecutionSignaledEventAttributes.SignalName)
s.Equal(identity, *signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
s.True(workflowComplete)
}

func (s *integrationSuite) TestQueryWorkflow() {
id := "interation-query-workflow-test"
wt := "interation-query-workflow-test-type"
Expand Down Expand Up @@ -2398,7 +2503,6 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
workflowComplete := false
childComplete := false
childExecutionStarted := false
childData := int32(1)
var startedEvent *workflow.HistoryEvent
var completedEvent *workflow.HistoryEvent
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
Expand All @@ -2421,8 +2525,6 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
if !childExecutionStarted {
s.logger.Info("Starting child execution.")
childExecutionStarted = true
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, childData))

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartChildWorkflowExecution),
Expand All @@ -2431,7 +2533,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
WorkflowId: common.StringPtr(childID),
WorkflowType: childWorkflowType,
TaskList: taskList,
Input: buf.Bytes(),
Input: []byte("child-workflow-input"),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(200),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyTerminate),
Expand Down
7 changes: 7 additions & 0 deletions schema/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ CREATE TYPE domain_config (
emit_metric boolean
);

CREATE TYPE serialized_event_batch (
encoding_type text,
version int,
data blob,
);

CREATE TABLE executions (
shard_id int,
type int, -- enum RowType { Shard, Execution, TransferTask, TimerTask}
Expand All @@ -157,6 +163,7 @@ CREATE TABLE executions (
timer_map map<text, frozen<timer_info>>,
child_executions_map map<bigint, frozen<child_execution_info>>,
request_cancel_map map<bigint, frozen<request_cancel_info>>,
buffered_events_list list<frozen<serialized_event_batch>>,
PRIMARY KEY (shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
Expand Down
7 changes: 7 additions & 0 deletions schema/cadence/versioned/v0.2/add_buffered_events.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TYPE serialized_event_batch (
encoding_type text,
version int,
data blob,
);

ALTER TABLE executions ADD buffered_events_list list<frozen<serialized_event_batch>>;
5 changes: 3 additions & 2 deletions schema/cadence/versioned/v0.2/manifest.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"CurrVersion": "0.2",
"MinCompatibleVersion": "0.2",
"Description": "add workflow timeout to mutable state",
"Description": "add workflow_timeout and buffered_events_list to mutable state",
"SchemaUpdateCqlFiles": [
"add_wf_timeout.cql"
"add_wf_timeout.cql",
"add_buffered_events.cql"
]
}
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ Update_History_Loop:
var failCause workflow.DecisionTaskFailedCause
var err error
completedID := *completedEvent.EventId
hasUnhandledEvents := ((completedID - startedID) > 1)
hasUnhandledEvents := (msBuilder.GetNextEventID() - completedID) > 1
isComplete := false
transferTasks := []persistence.Task{}
timerTasks := []persistence.Task{}
Expand Down
Loading

0 comments on commit a44aeeb

Please sign in to comment.