-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle more edge case when apply events #836
Conversation
e236d8f
to
e19bc84
Compare
service/history/historyReplicator.go
Outdated
@@ -37,6 +38,9 @@ import ( | |||
"github.com/uber/cadence/common/persistence" | |||
) | |||
|
|||
var errRetry = &shared.RetryTaskError{Message: "History replicator retry error"} | |||
var errDLQ = errors.New("History replicator DQL error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Fix "DQL" on error message.
service/history/historyReplicator.go
Outdated
@@ -37,6 +38,9 @@ import ( | |||
"github.com/uber/cadence/common/persistence" | |||
) | |||
|
|||
var errRetry = &shared.RetryTaskError{Message: "History replicator retry error"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you define all errors within the same var section. There is already one error for RetryTaskError.
service/history/historyReplicator.go
Outdated
@@ -77,32 +81,39 @@ func newHistoryReplicator(shard ShardContext, historyEngine *historyEngineImpl, | |||
} | |||
|
|||
func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retError error) { | |||
defer func() { | |||
if _, ok := retError.(*shared.EntityNotExistsError); ok { | |||
retError = errRetry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This catch all error could make debugging little harder. I think we should also include the actual message here. Or atleast log a warning before returning the catch all error.
service/history/historyReplicator.go
Outdated
|
||
// GetWorkflowExecution failed with some transient error. Return err so we can retry the task later | ||
if _, ok := err.(*shared.EntityNotExistsError); !ok { | ||
} else if _, ok := err.(*shared.EntityNotExistsError); !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why converting to else-if?
if _, ok := err.(*shared.EntityNotExistsError); ok { | ||
return ErrRetryEntityNotExists | ||
if _, ok := err.(*shared.EntityNotExistsError); !ok { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrRetryEntityNotExists error may not be needed if you remove this.
service/history/historyReplicator.go
Outdated
msBuilder.executionInfo.LastFirstEventID = firstEvent.GetEventId() | ||
msBuilder.executionInfo.NextEventID = lastEvent.GetEventId() + 1 | ||
incomingVersion := firstEvent.GetVersion() | ||
replicationState := &persistence.ReplicationState{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we should get rid of this and just call:
msBuilder.updateReplicationStateLastEventID
service/history/historyReplicator.go
Outdated
executionInfo.LastFirstEventID = firstEvent.GetEventId() | ||
executionInfo.NextEventID = lastEvent.GetEventId() + 1 | ||
incomingVersion := firstEvent.GetVersion() | ||
msBuilder.UpdateReplicationStateLastEventID("", incomingVersion, lastEvent.GetEventId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the sourceCluster from replication task instead of empty string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
service/history/historyReplicator.go
Outdated
_, err = createWorkflow(isBrandNew, errExist.RunID) | ||
} | ||
|
||
requestID := uuid.New() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move all this logic into ApplyStartEvent? We can simplify this logic to deal with only other events and have an assertion that it should not be called for WorkflowExecutionStartedEvent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, i think we should keep this logic like this.
all other functions, other than ApplyReplicationTask should prepare the mutable state.
ApplyReplicationTask should apply events to mutable state. and accordingly, create workflow or update workflow.
this can be useful if say, we want to apply events from a remote cluster to recover from a disaster, or simple reply a workflow using existing events for debugging reason
|
||
// we can also use the start version | ||
if currentLastWriteVersion > incomingVersion { | ||
logger.Infof("Dropping replication task. Current RunID: %v, Current LastWriteVersion: %v, Incoming Version: %v.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increment the 'StaleReplicationEventsCounter' counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added all 3 types:
HistoryConflictsCounter
StaleReplicationEventsCounter
BufferedReplicationTaskCounter
if rState.LastWriteVersion > incomingVersion { | ||
// Replication state is already on a higher version, we can drop this event | ||
// TODO: We need to replay external events like signal to the new version | ||
logger.Warnf("Dropping stale replication task. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v, IncomingV: %v.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increment the 'StaleReplicationEventsCounter' counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
service/history/historyReplicator.go
Outdated
logger.Warnf("Dropping stale replication task. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v, IncomingV: %v.", | ||
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, incomingVersion) | ||
return nil, nil | ||
} else if rState.LastWriteVersion < incomingVersion { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change 'else if' to 'if'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine
service/history/historyReplicator.go
Outdated
logger.Errorf("No ReplicationInfo Found For Previous Active Cluster. Previous Active Cluster: %v, Request Source Cluster: %v, Request ReplicationInfo: %v.", | ||
previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo) | ||
// TODO: Handle missing replication information, #840 | ||
return nil, ErrMissingReplicationInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also include the existing comment for this part of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy pasted the existing one: "// Returning BadRequestError to force the message to land into DLQ"
is this the comment you are talking about?
service/history/historyReplicator.go
Outdated
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.BufferedReplicationTaskCounter) | ||
// Detect conflict | ||
if ri.GetLastEventId() < rState.LastWriteEventID { | ||
logger.Infof("Conflict detected. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increment 'HistoryConflictsCounter' counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
service/history/historyReplicator.go
Outdated
msBuilder.GetNextEventID(), firstEvent.GetEventId()) | ||
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.BufferedReplicationTaskCounter) | ||
// Detect conflict | ||
if ri.GetLastEventId() < rState.LastWriteEventID { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you changed '!=' to '<'. Whats the reasoning? Can we capture that as a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
service/history/historyReplicator.go
Outdated
} | ||
} else if ri.GetLastEventId() > rState.LastWriteEventID { | ||
// TODO handle this case | ||
logger.Errorf("Conflict detected. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I get this. Can you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
say if we have a bug, this case can happen, we should return some unique error and do logging
service/history/historyReplicator.go
Outdated
logger.Errorf("Conflict detected. State: {CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevActiveCluster: %v, V: %v, LastEventID: %v}", | ||
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID, | ||
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId()) | ||
return nil, ErrMissingReplicationInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use a different error here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// ErrCorruptedReplicationInfo is returned when replication task has corrupted replication information from source cluster
ErrCorruptedReplicationInfo = &shared.BadRequestError{Message: "replication task is has corrupted cluster replication info"}
firstEventID := request.GetFirstEventId() | ||
if firstEventID < msBuilder.GetNextEventID() { | ||
// duplicate replication task | ||
replicationState := msBuilder.GetReplicationState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increment 'StaleReplicationEventsCounter' metric counter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all done
service/history/historyReplicator.go
Outdated
logger.Debugf("Dropping replication task. State: {NextEvent: %v, Version: %v, LastWriteV: %v, LastWriteEvent: %v}", | ||
msBuilder.GetNextEventID(), replicationState.CurrentVersion, replicationState.LastWriteVersion, replicationState.LastWriteEventID) | ||
return nil | ||
} else if firstEventID > msBuilder.GetNextEventID() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change 'else if' to 'if'
service/history/historyReplicator.go
Outdated
logger.Debugf("Buffer out of order replication task. NextEvent: %v, FirstEvent: %v", | ||
msBuilder.GetNextEventID(), firstEventID) | ||
|
||
err = msBuilder.BufferReplicationTask(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
increment 'BufferedReplicationTaskCounter' metric counter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. Can you address my minor comments? It is good to land once you address those.
solve #675, #814, #791