Skip to content

Commit

Permalink
Close shard if get workflow execution failed on shardownershiplost error
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Mar 16, 2024
1 parent 786d6e7 commit 7ffd197
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
1 change: 1 addition & 0 deletions common/persistence/executionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
internalRequest := &InternalGetWorkflowExecutionRequest{
DomainID: request.DomainID,
Execution: request.Execution,
RangeID: request.RangeID,

Check warning on line 76 in common/persistence/executionManager.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/executionManager.go#L76

Added line #L76 was not covered by tests
}
response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion service/history/engine/engineimpl/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2835,7 +2835,12 @@ func (e *historyEngineImpl) ReplicateEventsV2(
replicateRequest *types.ReplicateEventsV2Request,
) error {

return e.nDCReplicator.ApplyEvents(ctx, replicateRequest)
err := e.nDCReplicator.ApplyEvents(ctx, replicateRequest)
if err != nil {
e.logger.Error("Failed to replicate events", tag.Error(err))
return err

Check warning on line 2841 in service/history/engine/engineimpl/historyEngine.go

View check run for this annotation

Codecov / codecov/patch

service/history/engine/engineimpl/historyEngine.go#L2838-L2841

Added lines #L2838 - L2841 were not covered by tests
}
return nil

Check warning on line 2843 in service/history/engine/engineimpl/historyEngine.go

View check run for this annotation

Codecov / codecov/patch

service/history/engine/engineimpl/historyEngine.go#L2843

Added line #L2843 was not covered by tests
}

func (e *historyEngineImpl) SyncShardStatus(
Expand Down
19 changes: 18 additions & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,24 @@ func (s *contextImpl) GetWorkflowExecution(
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
return s.executionManager.GetWorkflowExecution(ctx, request)
response, err := s.executionManager.GetWorkflowExecution(ctx, request)
switch err.(type) {
case nil:
return response, nil
case *persistence.ShardOwnershipLostError:

Check warning on line 597 in service/history/shard/context.go

View check run for this annotation

Codecov / codecov/patch

service/history/shard/context.go#L597

Added line #L597 was not covered by tests
{
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: GetWorkflowExecution failed due to stolen shard.",
tag.Error(err),
tag.ShardRangeID(currentRangeID),
)
s.closeShard()
return nil, err

Check warning on line 606 in service/history/shard/context.go

View check run for this annotation

Codecov / codecov/patch

service/history/shard/context.go#L600-L606

Added lines #L600 - L606 were not covered by tests
}
default:
return nil, err
}
}

func (s *contextImpl) CreateWorkflowExecution(
Expand Down

0 comments on commit 7ffd197

Please sign in to comment.