Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
samarabbas committed Mar 31, 2018
1 parent 958840c commit 03ec83c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
2 changes: 1 addition & 1 deletion client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (c *clientImpl) ReplicateEvents(
ctx context.Context,
request *h.ReplicateEventsRequest,
opts ...yarpc.CallOption) error {
client, err := c.getHostForRequest(*request.WorkflowExecution.WorkflowId)
client, err := c.getHostForRequest(request.WorkflowExecution.GetWorkflowId())
if err != nil {
return err
}
Expand Down
37 changes: 35 additions & 2 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,45 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error
decisionScheduleID := emptyEventID
decisionStartID := emptyEventID
decisionTimeout := int32(0)
var requestID string
// TODO: Add handling for following events:
// WorkflowExecutionFailed,
// WorkflowExecutionTimedOut,
// ActivityTaskFailed,
// ActivityTaskTimedOut,
// ActivityTaskCancelRequested,
// RequestCancelActivityTaskFailed,
// ActivityTaskCanceled,
// TimerStarted,
// TimerFired,
// CancelTimerFailed,
// TimerCanceled,
// WorkflowExecutionCancelRequested,
// WorkflowExecutionCanceled,
// RequestCancelExternalWorkflowExecutionInitiated,
// RequestCancelExternalWorkflowExecutionFailed,
// ExternalWorkflowExecutionCancelRequested,
// MarkerRecorded,
// WorkflowExecutionSignaled,
// WorkflowExecutionTerminated,
// WorkflowExecutionContinuedAsNew,
// StartChildWorkflowExecutionInitiated,
// StartChildWorkflowExecutionFailed,
// ChildWorkflowExecutionStarted,
// ChildWorkflowExecutionCompleted,
// ChildWorkflowExecutionFailed,
// ChildWorkflowExecutionCanceled,
// ChildWorkflowExecutionTimedOut,
// ChildWorkflowExecutionTerminated,
// SignalExternalWorkflowExecutionInitiated,
// SignalExternalWorkflowExecutionFailed,
// ExternalWorkflowExecutionSignaled,
for _, event := range request.History.Events {
lastEvent = event
switch event.GetEventType() {
case shared.EventTypeWorkflowExecutionStarted:
attributes := event.WorkflowExecutionStartedEventAttributes
requestID := uuid.New()
requestID = uuid.New()
msBuilder.ReplicateWorkflowExecutionStartedEvent(domainID, execution, requestID, attributes)

case shared.EventTypeDecisionTaskScheduled:
Expand Down Expand Up @@ -199,7 +232,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error

createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) {
_, err = r.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{
RequestID: uuid.New(),
RequestID: requestID,
DomainID: domainID,
Execution: execution,
ParentDomainID: parentDomainID,
Expand Down
1 change: 0 additions & 1 deletion service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func (p *replicatorQueueProcessorImpl) CompleteTask(taskID int64) error {

func (p *replicatorQueueProcessorImpl) getHistory(task *persistence.ReplicationTaskInfo) (*shared.History, error) {

p.logger.Warnf("Received replication task: %v", task)
var nextPageToken []byte
historyEvents := []*shared.HistoryEvent{}
for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 {
Expand Down
1 change: 0 additions & 1 deletion service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {
p.logger.Debugf("Recieved domain replication task %v.", task.DomainTaskAttributes)
err = p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes)
case replicator.ReplicationTaskTypeHistory:
p.logger.Warn("Recieved history replication task %v.", task.HistoryTaskAttributes)
err = p.historyClient.ReplicateEvents(context.Background(), &h.ReplicateEventsRequest{
DomainUUID: task.HistoryTaskAttributes.DomainId,
WorkflowExecution: &shared.WorkflowExecution{
Expand Down

0 comments on commit 03ec83c

Please sign in to comment.