Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer events when decision task is inflight #386

Merged
merged 3 commits into from
Oct 26, 2017

Conversation

yiminc-zz
Copy link

No description provided.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.08%) to 66.48% when pulling a44aeeb on yiminc:buffer_events into cbc769d on uber:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.09%) to 66.492% when pulling d1256b3 on yiminc:buffer_events into cbc769d on uber:master.

emptyEventID int64 = -23
firstEventID int64 = 1
emptyEventID int64 = -23
bufferedEventID int64 = -123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why choose number -123? (also why emptyEventID -23)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pick a random number that is unique and is less than 0.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.08%) to 66.48% when pulling abfe317 on yiminc:buffer_events into cbc769d on uber:master.

func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBufferedEvents []*SerializedHistoryEventBatch,
clearBufferedEvents bool, domainID, workflowID, runID string, condition int64, rangeID int64) {

if clearBufferedEvents {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newBufferedEvents and clearBufferedEvents should be mutually exclusive. Let's have some validation here to make sure only one is set and convert this piece of code into if-else block.

@@ -942,6 +964,14 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
}
state.RequestCancelInfos = requestCancelInfos

eList := result["buffered_events_list"].([]map[string]interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you refactor this logic in a helper method. For example look at 'createTimerTaskInfo'. We create a helper method to deserialize each of these custom types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic is already put in a help function createSerializedHistoryEventBatch().

eventBatch := &SerializedHistoryEventBatch{EncodingType: common.EncodingTypeJSON}
for k, v := range result {
switch k {
case "version":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the default but why are you not reading the encoding_type?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the value is already set to JSON when create the struct in this method, so there is no need to read the encoding_type, unless we support other encoding type.

@@ -573,7 +573,7 @@ Update_History_Loop:
var failCause workflow.DecisionTaskFailedCause
var err error
completedID := *completedEvent.EventId
hasUnhandledEvents := ((completedID - startedID) > 1)
hasUnhandledEvents := (msBuilder.GetNextEventID() - completedID) > 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. We should first append all the decisions as part of decision task completed before flushing buffer to history events.
  2. Then this check could just be msBuilder.HasBufferedEvents.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

@@ -58,6 +58,10 @@ type (
updateRequestCancelInfos []*persistence.RequestCancelInfo // Modified RequestCancel Infos since last update
deleteRequestCancelInfo *int64 // Deleted RequestCancel Info since last update

persistedBufferedEvents []*persistence.SerializedHistoryEventBatch // buffered history events that are already persisted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the same naming convention like for other mutable state items.
bufferedEvents and updateBufferedEvents

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

@@ -75,6 +79,8 @@ type (
updateChildExecutionInfos []*persistence.ChildExecutionInfo
deleteChildExecutionInfo *int64
continueAsNew *persistence.CreateWorkflowExecutionRequest
newBufferedEvents []*persistence.SerializedHistoryEventBatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this needs to be an array? Every update should create a single batch of serialized event.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to make unit test pass

// put new events into 2 buckets:
// 1) if the event was added while there was in-flight decision, then put it in buffered bucket
// 2) otherwise, put it in committed bucket
var newBufferedEvents []*workflow.HistoryEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this could be simplified as we can never have partial buffered or committed events in the same update.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you still need to check if this is buffered or committed batch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to put the events into buffered or committed based on first event's event id, but it would break lots of unit tests, mostly because we have the tests that addDecisionTaskStarted, then add some other events, then add decisionTaskCompleted. I would spend more time to update those tests, but i feel it is not worth it.
All other comments have been addressed.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.07%) to 66.476% when pulling f1f67f0 on yiminc:buffer_events into cbc769d on uber:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.06%) to 66.46% when pulling 1b66aa0 on yiminc:buffer_events into cbc769d on uber:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 66.466% when pulling d8fc3c9 on yiminc:buffer_events into 1fb0a3d on uber:master.

@yiminc-zz yiminc-zz merged commit d79841b into cadence-workflow:master Oct 26, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants