From 03ec83cc475db38d925b3774bb41d058a85f2955 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Fri, 30 Mar 2018 17:00:41 -0700 Subject: [PATCH] addressed comments --- client/history/client.go | 2 +- service/history/historyReplicator.go | 37 +++++++++++++++++++-- service/history/replicatorQueueProcessor.go | 1 - service/worker/processor.go | 1 - 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/client/history/client.go b/client/history/client.go index fb8f0947d4f..0ab4ed606a4 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -456,7 +456,7 @@ func (c *clientImpl) ReplicateEvents( ctx context.Context, request *h.ReplicateEventsRequest, opts ...yarpc.CallOption) error { - client, err := c.getHostForRequest(*request.WorkflowExecution.WorkflowId) + client, err := c.getHostForRequest(request.WorkflowExecution.GetWorkflowId()) if err != nil { return err } diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index a598d4a84cb..d009d20ad75 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -95,12 +95,45 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error decisionScheduleID := emptyEventID decisionStartID := emptyEventID decisionTimeout := int32(0) + var requestID string + // TODO: Add handling for following events: + // WorkflowExecutionFailed, + // WorkflowExecutionTimedOut, + // ActivityTaskFailed, + // ActivityTaskTimedOut, + // ActivityTaskCancelRequested, + // RequestCancelActivityTaskFailed, + // ActivityTaskCanceled, + // TimerStarted, + // TimerFired, + // CancelTimerFailed, + // TimerCanceled, + // WorkflowExecutionCancelRequested, + // WorkflowExecutionCanceled, + // RequestCancelExternalWorkflowExecutionInitiated, + // RequestCancelExternalWorkflowExecutionFailed, + // ExternalWorkflowExecutionCancelRequested, + // MarkerRecorded, + // WorkflowExecutionSignaled, + // WorkflowExecutionTerminated, + // WorkflowExecutionContinuedAsNew, + // StartChildWorkflowExecutionInitiated, + // StartChildWorkflowExecutionFailed, + // ChildWorkflowExecutionStarted, + // ChildWorkflowExecutionCompleted, + // ChildWorkflowExecutionFailed, + // ChildWorkflowExecutionCanceled, + // ChildWorkflowExecutionTimedOut, + // ChildWorkflowExecutionTerminated, + // SignalExternalWorkflowExecutionInitiated, + // SignalExternalWorkflowExecutionFailed, + // ExternalWorkflowExecutionSignaled, for _, event := range request.History.Events { lastEvent = event switch event.GetEventType() { case shared.EventTypeWorkflowExecutionStarted: attributes := event.WorkflowExecutionStartedEventAttributes - requestID := uuid.New() + requestID = uuid.New() msBuilder.ReplicateWorkflowExecutionStartedEvent(domainID, execution, requestID, attributes) case shared.EventTypeDecisionTaskScheduled: @@ -199,7 +232,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { _, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ - RequestID: uuid.New(), + RequestID: requestID, DomainID: domainID, Execution: execution, ParentDomainID: parentDomainID, diff --git a/service/history/replicatorQueueProcessor.go b/service/history/replicatorQueueProcessor.go index 48556003e2f..4129006d880 100644 --- a/service/history/replicatorQueueProcessor.go +++ b/service/history/replicatorQueueProcessor.go @@ -156,7 +156,6 @@ func (p *replicatorQueueProcessorImpl) CompleteTask(taskID int64) error { func (p *replicatorQueueProcessorImpl) getHistory(task *persistence.ReplicationTaskInfo) (*shared.History, error) { - p.logger.Warnf("Received replication task: %v", task) var nextPageToken []byte historyEvents := []*shared.HistoryEvent{} for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 { diff --git a/service/worker/processor.go b/service/worker/processor.go index 74a897cb75d..d77399bba93 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -184,7 +184,6 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { p.logger.Debugf("Recieved domain replication task %v.", task.DomainTaskAttributes) err = p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes) case replicator.ReplicationTaskTypeHistory: - p.logger.Warn("Recieved history replication task %v.", task.HistoryTaskAttributes) err = p.historyClient.ReplicateEvents(context.Background(), &h.ReplicateEventsRequest{ DomainUUID: task.HistoryTaskAttributes.DomainId, WorkflowExecution: &shared.WorkflowExecution{