Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply replication history events to passive cluster #643

Merged

Conversation

samarabbas
Copy link
Contributor

Created history replicator which is invoked by the replicator for
processing of history replication tasks. It processes the history
events from the replication task and make mutable state updates for each
event. Once all events are processed it commits the entire update using
the workflowContext API used by rest of the stack.

Also mutableStateBuilder changes to apply mutable state changes using
the actual event. This required some refactoring of mutableStateBuilder
to reuse as much code as possible between replicator and rest of the
stack.

History service has new API ReplicateEvents which is called by the
replicator to apply history events.

Current this change only works for the happy case and is not guarded by
version updates on the domain.
Replicator does not support out of order processing of history
replication tasks.

case shared.EventTypeActivityTaskCompleted:
if err := msBuilder.ReplicateActivityTaskCompletedEvent(event); err != nil {
return err
}
Copy link
Contributor

@wxing1292 wxing1292 Mar 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this section is not completed, could you add the event types which are missing here in the comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ctx context.Context,
request *h.ReplicateEventsRequest,
opts ...yarpc.CallOption) error {
client, err := c.getHostForRequest(*request.WorkflowExecution.WorkflowId)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use GetWorkflowId()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return err
}
execution := shared.WorkflowExecution{
WorkflowId: request.WorkflowExecution.WorkflowId,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to check request.WorkflowExecution is valid? or it is already validated? Why do you need to create a copy of the execution and not just use the request.WorkflowExecution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actual event which needs to be replicated. So no validation is needed in this case. This is one of the main differences between historyEngine and historyReplicator.
As for the workflowExecution copy, will request.WorkflowExecution is a pointer and rest of the API takes in a struct. This is inconsistency in our implementation all over the place. I don't want to change it in this PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you could use *request.WorkflowExecution right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Done.


createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) {
_, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
RequestID: uuid.New(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you already have a request id created at line 103, are they different? why we need 2 request id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed. Using the same requestID now.

@@ -230,14 +229,14 @@ func (e *mutableStateBuilder) FlushBufferedEvents() error {
return nil
}

func (e *mutableStateBuilder) ApplyReplicationStateUpdates(failoverVersion int64) {
func (e *mutableStateBuilder) ApplyReplicationStateUpdates(failoverVersion, lastEventID int64) {
e.replicationState.CurrentVersion = failoverVersion

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the logic that you prevent failover from version that is lower than current version?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help answer this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change just implements the happy path. I will be implementing that part of the replication protocol in a separate PR.

@@ -1076,7 +1112,7 @@ func (e *mutableStateBuilder) AddDecisionTaskScheduleToStartTimeoutEvent(schedul

event := e.hBuilder.AddDecisionTaskTimedOutEvent(scheduleEventID, 0, workflow.TimeoutTypeScheduleToStart)

e.DeleteDecision()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means the client might never see the decision task with attempt==0. Maybe we need server to give additional explicit flag for client to decide if it should fail the decision with error message or it should silently not-response-and-let-it-timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed issue: #645 to track this.

@@ -156,6 +156,7 @@ func (p *replicatorQueueProcessorImpl) CompleteTask(taskID int64) error {

func (p *replicatorQueueProcessorImpl) getHistory(task *persistence.ReplicationTaskInfo) (*shared.History, error) {

p.logger.Warnf("Received replication task: %v", task)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be warning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

@@ -178,7 +184,19 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {
p.logger.Debugf("Recieved domain replication task %v.", task.DomainTaskAttributes)
err = p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes)
case replicator.ReplicationTaskTypeHistory:
p.logger.Debugf("Recieved history replication task %v.", task.HistoryTaskAttributes)
p.logger.Warn("Recieved history replication task %v.", task.HistoryTaskAttributes)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be warning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

Created history replicator which is invoked by the replicator for
processing of history replication tasks.  It processes the history
events from the replication task and make mutable state updates for each
event.  Once all events are processed it commits the entire update using
the workflowContext API used by rest of the stack.

Also mutableStateBuilder changes to apply mutable state changes using
the actual event.  This required some refactoring of mutableStateBuilder
to reuse as much code as possible between replicator and rest of the
stack.

History service has new API ReplicateEvents which is called by the
replicator to apply history events.

Current this change only works for the happy case and is not guarded by
version updates on the domain.
Replicator does not support out of order processing of history
replication tasks.
@coveralls
Copy link

coveralls commented Mar 31, 2018

Coverage Status

Coverage decreased (-0.7%) to 62.433% when pulling 7da5f3f on samarabbas:xdc-replication-protocol into 3e3e2fc on uber:master.

@samarabbas samarabbas merged commit fa74c62 into cadence-workflow:master Apr 2, 2018
@samarabbas samarabbas deleted the xdc-replication-protocol branch April 2, 2018 16:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants