Skip to content

Commit

Permalink
Optimize replication task generation (#869)
Browse files Browse the repository at this point in the history
* do not replicate history if target cluster is not self
* do not generate history replication task if target cluster is self
  • Loading branch information
wxing1292 authored Jun 21, 2018
1 parent 36c617d commit b501bf1
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 42 deletions.
4 changes: 2 additions & 2 deletions .gen/go/replicator/idl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 85 additions & 2 deletions .gen/go/replicator/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,12 +539,11 @@ func (entry *DomainCacheEntry) IsDomainActive() bool {
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName
}

// ShouldReplicateEvent return whether the workflows within this domain should be replicated
func (entry *DomainCacheEntry) ShouldReplicateEvent() bool {
// CanReplicateEvent return whether the workflows within this domain should be replicated
func (entry *DomainCacheEntry) CanReplicateEvent() bool {
// frontend guarantee that the clusters always contains the active domain, so if the # of clusters is 1
// then we do not need to send out any events for replication
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName &&
entry.isGlobalDomain && len(entry.replicationConfig.Clusters) > 1
return entry.isGlobalDomain && len(entry.replicationConfig.Clusters) > 1
}

// GetDomainNotActiveErr return err if domain is not active, nil otherwise
Expand All @@ -555,6 +554,8 @@ func (entry *DomainCacheEntry) GetDomainNotActiveErr() error {
}
return errors.NewDomainNotActiveError(entry.info.Name, entry.clusterMetadata.GetCurrentClusterName(), entry.replicationConfig.ActiveClusterName)
}

// Len return length
func (t DomainCacheEntries) Len() int {
return len(t)
}
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/replicator.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct DomainTaskAttributes {
}

struct HistoryTaskAttributes {
05: optional list<string> targetClusters
10: optional string domainId
20: optional string workflowId
30: optional string runId
Expand Down
38 changes: 22 additions & 16 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,22 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
return nil, err
}
msBuilder.GetExecutionInfo().LastFirstEventID = startedEvent.GetEventId()
createReplicationTask := msBuilder.GetReplicationState() != nil

var replicationState *persistence.ReplicationState
var replicationTasks []persistence.Task
if createReplicationTask {
if msBuilder.GetReplicationState() != nil {
msBuilder.UpdateReplicationStateLastEventID("", msBuilder.GetCurrentVersion(), msBuilder.GetNextEventID()-1)
replicationState = msBuilder.GetReplicationState()
replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: common.FirstEventID,
NextEventID: msBuilder.GetNextEventID(),
Version: msBuilder.GetCurrentVersion(),
LastReplicationInfo: nil,
// this is a hack, only create replication task if have # target cluster > 1, for more see #868
if domainEntry.CanReplicateEvent() {
replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: common.FirstEventID,
NextEventID: msBuilder.GetNextEventID(),
Version: msBuilder.GetCurrentVersion(),
LastReplicationInfo: nil,
}
replicationTasks = append(replicationTasks, replicationTask)
}
replicationTasks = append(replicationTasks, replicationTask)
}
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks)

Expand Down Expand Up @@ -1952,19 +1955,22 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx context.Context
return nil, err
}
msBuilder.GetExecutionInfo().LastFirstEventID = startedEvent.GetEventId()
createReplicationTask := msBuilder.GetReplicationState() != nil

var replicationState *persistence.ReplicationState
var replicationTasks []persistence.Task
if createReplicationTask {
if msBuilder.GetReplicationState() != nil {
msBuilder.UpdateReplicationStateLastEventID("", msBuilder.GetCurrentVersion(), msBuilder.GetNextEventID()-1)
replicationState = msBuilder.GetReplicationState()
replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: common.FirstEventID,
NextEventID: msBuilder.GetNextEventID(),
Version: msBuilder.GetCurrentVersion(),
LastReplicationInfo: nil,
// this is a hack, only create replication task if have # target cluster > 1, for more see #868
if domainEntry.CanReplicateEvent() {
replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: common.FirstEventID,
NextEventID: msBuilder.GetNextEventID(),
Version: msBuilder.GetCurrentVersion(),
LastReplicationInfo: nil,
}
replicationTasks = append(replicationTasks, replicationTask)
}
replicationTasks = append(replicationTasks, replicationTask)
}
setTaskVersion(msBuilder.GetCurrentVersion(), transferTasks, timerTasks)

Expand Down
10 changes: 10 additions & 0 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persi
sw := p.metricsClient.StartTimer(metrics.ReplicatorTaskHistoryScope, metrics.TaskLatency)
defer sw.Stop()

domainEntry, err := p.shard.GetDomainCache().GetDomainByID(task.DomainID)
if err != nil {
return err
}
targetClusters := []string{}
for _, cluster := range domainEntry.GetReplicationConfig().Clusters {
targetClusters = append(targetClusters, cluster.ClusterName)
}

history, err := p.getHistory(task.DomainID, task.WorkflowID, task.RunID, task.FirstEventID, task.NextEventID)
if err != nil {
return err
Expand All @@ -144,6 +153,7 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persi
replicationTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeHistory),
HistoryTaskAttributes: &replicator.HistoryTaskAttributes{
TargetClusters: targetClusters,
DomainId: common.StringPtr(task.DomainID),
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
Expand Down
25 changes: 16 additions & 9 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ type (
executionManager persistence.ExecutionManager
logger bark.Logger

locker common.Mutex
msBuilder mutableState
updateCondition int64
deleteTimerTask persistence.Task
locker common.Mutex
msBuilder mutableState
updateCondition int64
deleteTimerTask persistence.Task
createReplicationTask bool
}
)

Expand Down Expand Up @@ -161,26 +162,32 @@ func (c *workflowExecutionContext) updateVersion() error {
return err
}
c.msBuilder.UpdateReplicationStateVersion(domainEntry.GetFailoverVersion())

// this is a hack, only create replication task if have # target cluster > 1, for more see #868
c.createReplicationTask = domainEntry.CanReplicateEvent()
}
return nil
}

func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persistence.Task,
timerTasks []persistence.Task, transactionID int64) error {

// Only generate replication task if this is a global domain
createReplicationTask := c.msBuilder.GetReplicationState() != nil

currentVersion := c.msBuilder.GetCurrentVersion()
if createReplicationTask {
if c.msBuilder.GetReplicationState() != nil {
activeCluster := c.clusterMetadata.ClusterNameForFailoverVersion(currentVersion)
currentCluster := c.clusterMetadata.GetCurrentClusterName()
if activeCluster != currentCluster {
return errors.NewDomainNotActiveError(c.msBuilder.GetExecutionInfo().DomainID, currentCluster, activeCluster)
}
}

return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", currentVersion, transactionID)
if !c.createReplicationTask {
c.logger.Debugf("Skipping replication task creation: %v, workflowID: %v, runID: %v, firstEventID: %v, nextEventID: %v.",
c.domainID, c.workflowExecution.GetWorkflowId(), c.workflowExecution.GetRunId(),
c.msBuilder.GetExecutionInfo().LastFirstEventID, c.msBuilder.GetExecutionInfo().NextEventID)
}

return c.updateHelper(nil, transferTasks, timerTasks, c.createReplicationTask, "", currentVersion, transactionID)
}

func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task,
Expand Down
33 changes: 24 additions & 9 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,34 @@ func (p *replicationTaskProcessor) handleHistoryReplicationTask(task *replicator
sw := p.metricsClient.StartTimer(metrics.HistoryReplicationTaskScope, metrics.ReplicatorLatency)
defer sw.Stop()

attr := task.HistoryTaskAttributes
processTask := false
Loop:
for _, cluster := range attr.TargetClusters {
if p.currentCluster == cluster {
processTask = true
break Loop
}
}
if !processTask {
p.logger.Debugf("Dropping non-targeted history task with domainID: %v, workflowID: %v, runID: %v, firstEventID: %v, nextEventID: %v.",
attr.GetDomainId(), attr.GetWorkflowId(), attr.GetRunId(), attr.GetFirstEventId(), attr.GetNextEventId())
return nil
}

return p.historyClient.ReplicateEvents(context.Background(), &h.ReplicateEventsRequest{
SourceCluster: common.StringPtr(p.sourceCluster),
DomainUUID: task.HistoryTaskAttributes.DomainId,
DomainUUID: attr.DomainId,
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: task.HistoryTaskAttributes.WorkflowId,
RunId: task.HistoryTaskAttributes.RunId,
WorkflowId: attr.WorkflowId,
RunId: attr.RunId,
},
FirstEventId: task.HistoryTaskAttributes.FirstEventId,
NextEventId: task.HistoryTaskAttributes.NextEventId,
Version: task.HistoryTaskAttributes.Version,
ReplicationInfo: task.HistoryTaskAttributes.ReplicationInfo,
History: task.HistoryTaskAttributes.History,
NewRunHistory: task.HistoryTaskAttributes.NewRunHistory,
FirstEventId: attr.FirstEventId,
NextEventId: attr.NextEventId,
Version: attr.Version,
ReplicationInfo: attr.ReplicationInfo,
History: attr.History,
NewRunHistory: attr.NewRunHistory,
})
}

Expand Down

0 comments on commit b501bf1

Please sign in to comment.