Skip to content

Commit

Permalink
[Wf-Diagnostics] Introduce Diagnostics starter workflow as parent wor…
Browse files Browse the repository at this point in the history
…kflow to run diagnostics (#6310)

* wf setup

* [Wf-Diagnostics] Introduce Diagnostics starter workflow as parent workflow to run diagnostics

* Update parent_workflow.go

* Update parent_workflow.go
  • Loading branch information
sankari165 authored Sep 27, 2024
1 parent 6594452 commit 4424df7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 22 deletions.
4 changes: 2 additions & 2 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (wh *WorkflowHandler) DiagnoseWorkflowExecution(ctx context.Context, reques
diagnosticWorkflowID := fmt.Sprintf("%s-%s-%s", request.GetDomain(), wfExecution.GetWorkflowID(), wfExecution.GetRunID())
diagnosticWorkflowDomain := "cadence-system"

diagnosticWorkflowInput := diagnostics.DiagnosticsWorkflowInput{
diagnosticWorkflowInput := diagnostics.DiagnosticsStarterWorkflowInput{
Domain: request.GetDomain(),
WorkflowID: request.GetWorkflowExecution().GetWorkflowID(),
RunID: request.GetWorkflowExecution().GetRunID(),
Expand All @@ -235,7 +235,7 @@ func (wh *WorkflowHandler) DiagnoseWorkflowExecution(ctx context.Context, reques
Domain: diagnosticWorkflowDomain,
WorkflowID: diagnosticWorkflowID,
WorkflowType: &types.WorkflowType{
Name: "diagnostics-workflow",
Name: "diagnostics-starter-workflow",
},
TaskList: &types.TaskList{
Name: "wf-diagnostics",
Expand Down
1 change: 1 addition & 0 deletions service/worker/diagnostics/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (w *dw) Start() error {
}
newWorker := worker.New(w.svcClient, common.SystemLocalDomainName, tasklist, workerOpts)
newWorker.RegisterWorkflowWithOptions(w.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow})
newWorker.RegisterWorkflowWithOptions(w.DiagnosticsStarterWorkflow, workflow.RegisterOptions{Name: diagnosticsStarterWorkflow})
newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
newWorker.RegisterActivityWithOptions(w.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity})
newWorker.RegisterActivityWithOptions(w.rootCauseTimeouts, activity.RegisterOptions{Name: rootCauseTimeoutsActivity})
Expand Down
65 changes: 65 additions & 0 deletions service/worker/diagnostics/parent_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package diagnostics

import (
"fmt"

"go.uber.org/cadence/workflow"
)

const (
diagnosticsStarterWorkflow = "diagnostics-starter-workflow"
queryDiagnosticsReport = "query-diagnostics-report"
)

type DiagnosticsStarterWorkflowInput struct {
Domain string
WorkflowID string
RunID string
}

type DiagnosticsStarterWorkflowResult struct {
DiagnosticsResult *DiagnosticsWorkflowResult
}

func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (*DiagnosticsStarterWorkflowResult, error) {
var result DiagnosticsWorkflowResult
err := workflow.SetQueryHandler(ctx, queryDiagnosticsReport, func() (DiagnosticsStarterWorkflowResult, error) {
return DiagnosticsStarterWorkflowResult{DiagnosticsResult: &result}, nil
})
if err != nil {
return nil, err
}

err = workflow.ExecuteChildWorkflow(ctx, w.DiagnosticsWorkflow, DiagnosticsWorkflowInput{
Domain: params.Domain,
WorkflowID: params.WorkflowID,
RunID: params.RunID,
}).Get(ctx, &result)
if err != nil {
return nil, fmt.Errorf("Workflow Diagnostics: %w", err)
}

return &DiagnosticsStarterWorkflowResult{DiagnosticsResult: &result}, nil
}
14 changes: 3 additions & 11 deletions service/worker/diagnostics/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ import (
)

const (
diagnosticsWorkflow = "diagnostics-workflow"
tasklist = "diagnostics-wf-tasklist"
queryDiagnosticsReport = "query-diagnostics-report"
diagnosticsWorkflow = "diagnostics-workflow"
tasklist = "diagnostics-wf-tasklist"

retrieveWfExecutionHistoryActivity = "retrieveWfExecutionHistory"
identifyTimeoutsActivity = "identifyTimeouts"
Expand Down Expand Up @@ -82,13 +81,6 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo
defer sw.Stop()

var timeoutsResult timeoutDiagnostics
err := workflow.SetQueryHandler(ctx, queryDiagnosticsReport, func() (DiagnosticsWorkflowResult, error) {
return DiagnosticsWorkflowResult{Timeouts: &timeoutsResult}, nil
})
if err != nil {
return nil, err
}

activityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second * 10,
ScheduleToStartTimeout: time.Second * 5,
Expand All @@ -97,7 +89,7 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo
activityCtx := workflow.WithActivityOptions(ctx, activityOptions)

var wfExecutionHistory *types.GetWorkflowExecutionHistoryResponse
err = workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{
err := workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{
Domain: params.Domain,
Execution: &types.WorkflowExecution{
WorkflowID: params.WorkflowID,
Expand Down
19 changes: 10 additions & 9 deletions service/worker/diagnostics/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (s *diagnosticsWorkflowTestSuite) SetupTest() {
mockResource.Finish(s.T())
})

s.workflowEnv.RegisterWorkflowWithOptions(s.dw.DiagnosticsStarterWorkflow, workflow.RegisterOptions{Name: diagnosticsStarterWorkflow})
s.workflowEnv.RegisterWorkflowWithOptions(s.dw.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow})
s.workflowEnv.RegisterActivityWithOptions(s.dw.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
s.workflowEnv.RegisterActivityWithOptions(s.dw.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity})
Expand All @@ -80,7 +81,7 @@ func (s *diagnosticsWorkflowTestSuite) TearDownTest() {
}

func (s *diagnosticsWorkflowTestSuite) TestWorkflow() {
params := &DiagnosticsWorkflowInput{
params := &DiagnosticsStarterWorkflowInput{
Domain: "test",
WorkflowID: "123",
RunID: "abc",
Expand Down Expand Up @@ -130,16 +131,16 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow() {
s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil)
s.workflowEnv.OnActivity(identifyTimeoutsActivity, mock.Anything, mock.Anything).Return(issues, nil)
s.workflowEnv.OnActivity(rootCauseTimeoutsActivity, mock.Anything, mock.Anything).Return(rootCause, nil)
s.workflowEnv.ExecuteWorkflow(diagnosticsWorkflow, params)
s.workflowEnv.ExecuteWorkflow(diagnosticsStarterWorkflow, params)
s.True(s.workflowEnv.IsWorkflowCompleted())
var result DiagnosticsWorkflowResult
var result DiagnosticsStarterWorkflowResult
s.NoError(s.workflowEnv.GetWorkflowResult(&result))
s.ElementsMatch(timeoutIssues, result.Timeouts.Issues)
s.ElementsMatch(timeoutRootCause, result.Timeouts.RootCause)
s.ElementsMatch(timeoutIssues, result.DiagnosticsResult.Timeouts.Issues)
s.ElementsMatch(timeoutRootCause, result.DiagnosticsResult.Timeouts.RootCause)

queriedResult := s.queryDiagnostics()
s.ElementsMatch(queriedResult.Timeouts.Issues, result.Timeouts.Issues)
s.ElementsMatch(queriedResult.Timeouts.RootCause, result.Timeouts.RootCause)
s.ElementsMatch(queriedResult.DiagnosticsResult.Timeouts.Issues, result.DiagnosticsResult.Timeouts.Issues)
s.ElementsMatch(queriedResult.DiagnosticsResult.Timeouts.RootCause, result.DiagnosticsResult.Timeouts.RootCause)
}

func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() {
Expand All @@ -158,11 +159,11 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() {
s.EqualError(s.workflowEnv.GetWorkflowError(), errExpected.Error())
}

func (s *diagnosticsWorkflowTestSuite) queryDiagnostics() DiagnosticsWorkflowResult {
func (s *diagnosticsWorkflowTestSuite) queryDiagnostics() DiagnosticsStarterWorkflowResult {
queryFuture, err := s.workflowEnv.QueryWorkflow(queryDiagnosticsReport)
s.NoError(err)

var result DiagnosticsWorkflowResult
var result DiagnosticsStarterWorkflowResult
err = queryFuture.Get(&result)
s.NoError(err)
return result
Expand Down

0 comments on commit 4424df7

Please sign in to comment.