diff --git a/service/history/handler.go b/service/history/handler.go index d8e0e293ffb..689bf6e0eee 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -310,34 +310,34 @@ func (h *handlerImpl) RecordActivityTaskHeartbeat( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } heartbeatRequest := wrappedRequest.HeartbeatRequest token, err0 := h.tokenSerializer.Deserialize(heartbeatRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return nil, h.error(err0, scope, domainID, "", "") + return nil, h.error(err0, scope, domainID, "") } err0 = validateTaskToken(token) if err0 != nil { - return nil, h.error(err0, scope, domainID, "", "") + return nil, h.error(err0, scope, domainID, "") } workflowID := token.WorkflowID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, "") + return nil, h.error(err1, scope, domainID, workflowID) } response, err2 := engine.RecordActivityTaskHeartbeat(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, "") + return nil, h.error(err2, scope, domainID, workflowID) } return response, nil @@ -369,21 +369,21 @@ func (h *handlerImpl) RecordActivityTaskStarted( ) if recordRequest.GetDomainUUID() == "" { - return nil, h.error(errDomainNotSet, scope, domainID, workflowID, "") + return nil, h.error(errDomainNotSet, scope, domainID, workflowID) } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID) } engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, "") + return nil, h.error(err1, scope, domainID, workflowID) } response, err2 := engine.RecordActivityTaskStarted(ctx, recordRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, "") + return nil, h.error(err2, scope, domainID, workflowID) } return response, nil @@ -404,7 +404,6 @@ func (h *handlerImpl) RecordDecisionTaskStarted( domainID := recordRequest.GetDomainUUID() workflowExecution := recordRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() h.emitInfoOrDebugLog( domainID, @@ -416,15 +415,15 @@ func (h *handlerImpl) RecordDecisionTaskStarted( ) if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, workflowID, runID) + return nil, h.error(errDomainNotSet, scope, domainID, workflowID) } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, runID) + return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID) } if recordRequest.PollRequest == nil || recordRequest.PollRequest.TaskList.GetName() == "" { - return nil, h.error(errTaskListNotSet, scope, domainID, workflowID, runID) + return nil, h.error(errTaskListNotSet, scope, domainID, workflowID) } engine, err1 := h.controller.GetEngine(workflowID) @@ -432,16 +431,15 @@ func (h *handlerImpl) RecordDecisionTaskStarted( h.GetLogger().Error("RecordDecisionTaskStarted failed.", tag.Error(err1), tag.WorkflowID(recordRequest.WorkflowExecution.GetWorkflowID()), - tag.WorkflowRunID(runID), tag.WorkflowRunID(recordRequest.WorkflowExecution.GetRunID()), tag.WorkflowScheduleID(recordRequest.GetScheduleID()), ) - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } response, err2 := engine.RecordDecisionTaskStarted(ctx, recordRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return response, nil @@ -461,35 +459,34 @@ func (h *handlerImpl) RespondActivityTaskCompleted( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } completeRequest := wrappedRequest.CompleteRequest token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } workflowID := token.WorkflowID - runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.RespondActivityTaskCompleted(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -509,35 +506,34 @@ func (h *handlerImpl) RespondActivityTaskFailed( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } failRequest := wrappedRequest.FailedRequest token, err0 := h.tokenSerializer.Deserialize(failRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } workflowID := token.WorkflowID - runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.RespondActivityTaskFailed(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -557,35 +553,34 @@ func (h *handlerImpl) RespondActivityTaskCanceled( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } cancelRequest := wrappedRequest.CancelRequest token, err0 := h.tokenSerializer.Deserialize(cancelRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } workflowID := token.WorkflowID - runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.RespondActivityTaskCanceled(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -605,11 +600,11 @@ func (h *handlerImpl) RespondDecisionTaskCompleted( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } completeRequest := wrappedRequest.CompleteRequest @@ -619,7 +614,7 @@ func (h *handlerImpl) RespondDecisionTaskCompleted( token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return nil, h.error(err0, scope, domainID, "", "") + return nil, h.error(err0, scope, domainID, "") } h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskCompleted. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v", @@ -630,19 +625,18 @@ func (h *handlerImpl) RespondDecisionTaskCompleted( err0 = validateTaskToken(token) if err0 != nil { - return nil, h.error(err0, scope, domainID, "", "") + return nil, h.error(err0, scope, domainID, "") } workflowID := token.WorkflowID - runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } response, err2 := engine.RespondDecisionTaskCompleted(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return response, nil @@ -662,18 +656,18 @@ func (h *handlerImpl) RespondDecisionTaskFailed( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } failedRequest := wrappedRequest.FailedRequest token, err0 := h.tokenSerializer.Deserialize(failedRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskFailed. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v", @@ -697,19 +691,18 @@ func (h *handlerImpl) RespondDecisionTaskFailed( } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "", "") + return h.error(err0, scope, domainID, "") } workflowID := token.WorkflowID - runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.RespondDecisionTaskFailed(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -729,24 +722,23 @@ func (h *handlerImpl) StartWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } startRequest := wrappedRequest.StartRequest workflowID := startRequest.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, "") + return nil, h.error(err1, scope, domainID, workflowID) } response, err2 := engine.StartWorkflowExecution(ctx, wrappedRequest) - runID := response.GetRunID() if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return response, nil @@ -842,7 +834,7 @@ func (h *handlerImpl) ResetQueue( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return h.error(err, scope, "", "", "") + return h.error(err, scope, "", "") } switch taskType := common.TaskType(request.GetType()); taskType { @@ -857,7 +849,7 @@ func (h *handlerImpl) ResetQueue( } if err != nil { - return h.error(err, scope, "", "", "") + return h.error(err, scope, "", "") } return nil } @@ -876,7 +868,7 @@ func (h *handlerImpl) DescribeQueue( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "", "") + return nil, h.error(err, scope, "", "") } switch taskType := common.TaskType(request.GetType()); taskType { @@ -891,7 +883,7 @@ func (h *handlerImpl) DescribeQueue( } if err != nil { - return nil, h.error(err, scope, "", "", "") + return nil, h.error(err, scope, "", "") } return resp, nil } @@ -910,20 +902,19 @@ func (h *handlerImpl) DescribeMutableState( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } workflowExecution := request.Execution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } resp, err2 := engine.DescribeMutableState(ctx, request) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return resp, nil } @@ -942,24 +933,23 @@ func (h *handlerImpl) GetMutableState( domainID := getRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := getRequest.Execution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } resp, err2 := engine.GetMutableState(ctx, getRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return resp, nil } @@ -978,24 +968,23 @@ func (h *handlerImpl) PollMutableState( domainID := getRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := getRequest.Execution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } resp, err2 := engine.PollMutableState(ctx, getRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return resp, nil } @@ -1014,24 +1003,23 @@ func (h *handlerImpl) DescribeWorkflowExecution( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := request.Request.Execution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } resp, err2 := engine.DescribeWorkflowExecution(ctx, request) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return resp, nil } @@ -1054,11 +1042,11 @@ func (h *handlerImpl) RequestCancelWorkflowExecution( domainID := request.GetDomainUUID() if domainID == "" || request.CancelRequest.GetDomain() == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } cancelRequest := request.CancelRequest @@ -1069,15 +1057,14 @@ func (h *handlerImpl) RequestCancelWorkflowExecution( cancelRequest.WorkflowExecution.GetRunID())) workflowID := cancelRequest.WorkflowExecution.GetWorkflowID() - runID := cancelRequest.WorkflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.RequestCancelWorkflowExecution(ctx, request) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -1102,24 +1089,23 @@ func (h *handlerImpl) SignalWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := wrappedRequest.SignalRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.SignalWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -1147,18 +1133,18 @@ func (h *handlerImpl) SignalWithStartWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } signalWithStartRequest := wrappedRequest.SignalWithStartRequest workflowID := signalWithStartRequest.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, "") + return nil, h.error(err1, scope, domainID, workflowID) } resp, err2 := engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest) @@ -1175,12 +1161,12 @@ func (h *handlerImpl) SignalWithStartWorkflowExecution( var e1 *persistence.WorkflowExecutionAlreadyStartedError var e2 *persistence.CurrentWorkflowConditionFailedError if !errors.As(err2, &e1) && !errors.As(err2, &e2) { - return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID()) + return nil, h.error(err2, scope, domainID, workflowID) } resp, err2 = engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID()) + return nil, h.error(err2, scope, domainID, workflowID) } return resp, nil } @@ -1204,24 +1190,23 @@ func (h *handlerImpl) RemoveSignalMutableState( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := wrappedRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.RemoveSignalMutableState(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -1246,24 +1231,23 @@ func (h *handlerImpl) TerminateWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := wrappedRequest.TerminateRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.TerminateWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -1288,24 +1272,23 @@ func (h *handlerImpl) ResetWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := wrappedRequest.ResetRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } resp, err2 := engine.ResetWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return resp, nil @@ -1328,23 +1311,22 @@ func (h *handlerImpl) QueryWorkflow( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } workflowID := request.GetRequest().GetExecution().GetWorkflowID() - runID := request.GetRequest().GetExecution().GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID, runID) + return nil, h.error(err1, scope, domainID, workflowID) } resp, err2 := engine.QueryWorkflow(ctx, request) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID, runID) + return nil, h.error(err2, scope, domainID, workflowID) } return resp, nil @@ -1371,28 +1353,27 @@ func (h *handlerImpl) ScheduleDecisionTask( domainID := request.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } if request.WorkflowExecution == nil { - return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "") + return h.error(errWorkflowExecutionNotSet, scope, domainID, "") } workflowExecution := request.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.ScheduleDecisionTask(ctx, request) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -1417,28 +1398,27 @@ func (h *handlerImpl) RecordChildExecutionCompleted( domainID := request.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } if request.WorkflowExecution == nil { - return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "") + return h.error(errWorkflowExecutionNotSet, scope, domainID, "") } workflowExecution := request.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.RecordChildExecutionCompleted(ctx, request) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -1468,23 +1448,22 @@ func (h *handlerImpl) ResetStickyTaskList( domainID := resetRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "", "") + return nil, h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "") } workflowID := resetRequest.Execution.GetWorkflowID() - runID := resetRequest.Execution.GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return nil, h.error(err, scope, domainID, workflowID, runID) + return nil, h.error(err, scope, domainID, workflowID) } resp, err = engine.ResetStickyTaskList(ctx, resetRequest) if err != nil { - return nil, h.error(err, scope, domainID, workflowID, runID) + return nil, h.error(err, scope, domainID, workflowID) } return resp, nil @@ -1508,24 +1487,23 @@ func (h *handlerImpl) ReplicateEventsV2( domainID := replicateRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } workflowExecution := replicateRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() - runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID, runID) + return h.error(err1, scope, domainID, workflowID) } err2 := engine.ReplicateEventsV2(ctx, replicateRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID, runID) + return h.error(err2, scope, domainID, workflowID) } return nil @@ -1548,26 +1526,26 @@ func (h *handlerImpl) SyncShardStatus( } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, "", "", "") + return h.error(errHistoryHostThrottle, scope, "", "") } if syncShardStatusRequest.SourceCluster == "" { - return h.error(errSourceClusterNotSet, scope, "", "", "") + return h.error(errSourceClusterNotSet, scope, "", "") } if syncShardStatusRequest.Timestamp == nil { - return h.error(errTimestampNotSet, scope, "", "", "") + return h.error(errTimestampNotSet, scope, "", "") } // shard ID is already provided in the request engine, err := h.controller.GetEngineForShard(int(syncShardStatusRequest.GetShardID())) if err != nil { - return h.error(err, scope, "", "", "") + return h.error(err, scope, "", "") } err = engine.SyncShardStatus(ctx, syncShardStatusRequest) if err != nil { - return h.error(err, scope, "", "", "") + return h.error(err, scope, "", "") } return nil @@ -1591,31 +1569,30 @@ func (h *handlerImpl) SyncActivity( domainID := syncActivityRequest.GetDomainID() if syncActivityRequest.DomainID == "" || uuid.Parse(syncActivityRequest.GetDomainID()) == nil { - return h.error(errDomainNotSet, scope, domainID, "", "") + return h.error(errDomainNotSet, scope, domainID, "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "", "") + return h.error(errHistoryHostThrottle, scope, domainID, "") } if syncActivityRequest.WorkflowID == "" { - return h.error(errWorkflowIDNotSet, scope, domainID, "", "") + return h.error(errWorkflowIDNotSet, scope, domainID, "") } if syncActivityRequest.RunID == "" || uuid.Parse(syncActivityRequest.GetRunID()) == nil { - return h.error(errRunIDNotValid, scope, domainID, "", "") + return h.error(errRunIDNotValid, scope, domainID, "") } workflowID := syncActivityRequest.GetWorkflowID() - runID := syncActivityRequest.GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return h.error(err, scope, domainID, workflowID, runID) + return h.error(err, scope, domainID, workflowID) } err = engine.SyncActivity(ctx, syncActivityRequest) if err != nil { - return h.error(err, scope, domainID, workflowID, runID) + return h.error(err, scope, domainID, workflowID) } return nil @@ -1790,10 +1767,9 @@ func (h *handlerImpl) ReapplyEvents( domainID := request.GetDomainUUID() workflowID := request.GetRequest().GetWorkflowExecution().GetWorkflowID() - runID := request.GetRequest().GetWorkflowExecution().GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return h.error(err, scope, domainID, workflowID, runID) + return h.error(err, scope, domainID, workflowID) } // deserialize history event object historyEvents, err := h.GetPayloadSerializer().DeserializeBatchEvents(&persistence.DataBlob{ @@ -1801,7 +1777,7 @@ func (h *handlerImpl) ReapplyEvents( Data: request.GetRequest().GetEvents().GetData(), }) if err != nil { - return h.error(err, scope, domainID, workflowID, runID) + return h.error(err, scope, domainID, workflowID) } execution := request.GetRequest().GetWorkflowExecution() @@ -1812,7 +1788,7 @@ func (h *handlerImpl) ReapplyEvents( execution.GetRunID(), historyEvents, ); err != nil { - return h.error(err, scope, domainID, workflowID, runID) + return h.error(err, scope, domainID, workflowID) } return nil } @@ -1860,7 +1836,7 @@ func (h *handlerImpl) CountDLQMessages( } err := g.Wait() - return &types.HistoryCountDLQMessagesResponse{Entries: entries}, h.error(err, scope, "", "", "") + return &types.HistoryCountDLQMessagesResponse{Entries: entries}, h.error(err, scope, "", "") } // ReadDLQMessages reads replication DLQ messages @@ -1881,7 +1857,7 @@ func (h *handlerImpl) ReadDLQMessages( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "", "") + return nil, h.error(err, scope, "", "") } return engine.ReadDLQMessages(ctx, request) @@ -1905,7 +1881,7 @@ func (h *handlerImpl) PurgeDLQMessages( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return h.error(err, scope, "", "", "") + return h.error(err, scope, "", "") } return engine.PurgeDLQMessages(ctx, request) @@ -1929,7 +1905,7 @@ func (h *handlerImpl) MergeDLQMessages( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "", "") + return nil, h.error(err, scope, "", "") } return engine.MergeDLQMessages(ctx, request) @@ -1950,10 +1926,9 @@ func (h *handlerImpl) RefreshWorkflowTasks( domainID := request.DomainUIID execution := request.GetRequest().GetExecution() workflowID := execution.GetWorkflowID() - runID := execution.GetWorkflowID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return h.error(err, scope, domainID, workflowID, runID) + return h.error(err, scope, domainID, workflowID) } err = engine.RefreshWorkflowTasks( @@ -1966,7 +1941,7 @@ func (h *handlerImpl) RefreshWorkflowTasks( ) if err != nil { - return h.error(err, scope, domainID, workflowID, runID) + return h.error(err, scope, domainID, workflowID) } return nil @@ -2068,12 +2043,12 @@ func (h *handlerImpl) RespondCrossClusterTasksCompleted( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "", "") + return nil, h.error(err, scope, "", "") } err = engine.RespondCrossClusterTasksCompleted(ctx, request.TargetCluster, request.TaskResponses) if err != nil { - return nil, h.error(err, scope, "", "", "") + return nil, h.error(err, scope, "", "") } response := &types.RespondCrossClusterTasksCompletedResponse{} @@ -2083,7 +2058,7 @@ func (h *handlerImpl) RespondCrossClusterTasksCompleted( response.Tasks, err = engine.GetCrossClusterTasks(fetchTaskCtx, request.TargetCluster) if err != nil { - return nil, h.error(err, scope, "", "", "") + return nil, h.error(err, scope, "", "") } } return response, nil @@ -2105,7 +2080,7 @@ func (h *handlerImpl) GetFailoverInfo( resp, err := h.failoverCoordinator.GetFailoverInfo(request.GetDomainID()) if err != nil { - return nil, h.error(err, scope, request.GetDomainID(), "", "") + return nil, h.error(err, scope, request.GetDomainID(), "") } return resp, nil } @@ -2137,74 +2112,51 @@ func (h *handlerImpl) updateErrorMetric( scope metrics.Scope, domainID string, workflowID string, - runID string, err error, ) { - var yarpcE *yarpcerrors.Status - - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + if err == context.DeadlineExceeded || err == context.Canceled { scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) return } - if errors.Is(err, types.ShardOwnershipLostError{}) { + switch err := err.(type) { + case *types.ShardOwnershipLostError: scope.IncCounter(metrics.CadenceErrShardOwnershipLostCounter) - - } else if errors.Is(err, types.EventAlreadyStartedError{}) { + case *types.EventAlreadyStartedError: scope.IncCounter(metrics.CadenceErrEventAlreadyStartedCounter) - - } else if errors.Is(err, types.BadRequestError{}) { + case *types.BadRequestError: scope.IncCounter(metrics.CadenceErrBadRequestCounter) - - } else if errors.Is(err, types.DomainNotActiveError{}) { + case *types.DomainNotActiveError: scope.IncCounter(metrics.CadenceErrBadRequestCounter) - - } else if errors.Is(err, types.WorkflowExecutionAlreadyStartedError{}) { + case *types.WorkflowExecutionAlreadyStartedError: scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter) - - } else if errors.Is(err, types.EntityNotExistsError{}) { + case *types.EntityNotExistsError: scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter) - - } else if errors.Is(err, types.WorkflowExecutionAlreadyCompletedError{}) { + case *types.WorkflowExecutionAlreadyCompletedError: scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter) - - } else if errors.Is(err, types.CancellationAlreadyRequestedError{}) { + case *types.CancellationAlreadyRequestedError: scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter) - - } else if errors.Is(err, types.LimitExceededError{}) { + case *types.LimitExceededError: scope.IncCounter(metrics.CadenceErrLimitExceededCounter) - - } else if errors.Is(err, types.RetryTaskV2Error{}) { + case *types.RetryTaskV2Error: scope.IncCounter(metrics.CadenceErrRetryTaskCounter) - - } else if errors.Is(err, types.ServiceBusyError{}) { + case *types.ServiceBusyError: scope.IncCounter(metrics.CadenceErrServiceBusyCounter) - - } else if errors.As(err, &yarpcE) { - - if yarpcE.Code() == yarpcerrors.CodeDeadlineExceeded { + case *yarpcerrors.Status: + if err.Code() == yarpcerrors.CodeDeadlineExceeded { scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) } scope.IncCounter(metrics.CadenceFailures) - - } else if errors.Is(err, types.InternalServiceError{}) { + case *types.InternalServiceError: scope.IncCounter(metrics.CadenceFailures) - h.GetLogger().Error("Internal service error", tag.Error(err), tag.WorkflowID(workflowID), - tag.WorkflowRunID(runID), tag.WorkflowDomainID(domainID)) - - } else { - // Default / unknown error fallback + default: scope.IncCounter(metrics.CadenceFailures) - h.GetLogger().Error("Uncategorized error", - tag.Error(err), - tag.WorkflowID(workflowID), - tag.WorkflowRunID(runID), - tag.WorkflowDomainID(domainID)) + h.getLoggerWithTags(domainID, workflowID).Error("Uncategorized error", tag.Error(err)) } } @@ -2213,11 +2165,10 @@ func (h *handlerImpl) error( scope metrics.Scope, domainID string, workflowID string, - runID string, ) error { - err = h.convertError(err) - h.updateErrorMetric(scope, domainID, workflowID, runID, err) + err = h.convertError(err) + h.updateErrorMetric(scope, domainID, workflowID, err) if errors.Is(err, workflow.ErrMaxAttemptsExceeded) { // Calling the dummy Workflow Check from task Validator. This is an ongoing project where we plan to do some validations on // the following workflow. Based on the validations (is the workflow stale? does the workflow come from a deprecated domain?) @@ -2229,6 +2180,23 @@ func (h *handlerImpl) error( return err } +func (h *handlerImpl) getLoggerWithTags( + domainID string, + workflowID string, +) log.Logger { + + logger := h.GetLogger() + if domainID != "" { + logger = logger.WithTags(tag.WorkflowDomainID(domainID)) + } + + if workflowID != "" { + logger = logger.WithTags(tag.WorkflowID(workflowID)) + } + + return logger +} + func (h *handlerImpl) emitInfoOrDebugLog( domainID string, msg string,