From 5a06b73a89cff7d34f006d89bbb8cf2947efeab5 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Thu, 22 Feb 2024 00:26:04 -0800 Subject: [PATCH] Fix context leak in pinot integration test --- host/pinot_test.go | 124 +++++++++++++++++++++++++-------------------- 1 file changed, 70 insertions(+), 54 deletions(-) diff --git a/host/pinot_test.go b/host/pinot_test.go index 8705a4addb7..1a8eb3f341d 100644 --- a/host/pinot_test.go +++ b/host/pinot_test.go @@ -18,9 +18,6 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:build pinotintegration -// +build pinotintegration - // to run locally, make sure kafka and pinot is running, // then run cmd `go test -v ./host -run TestPinotIntegrationSuite -tags pinotintegration` // currently we have to manually add test table and delete the table for cleaning @@ -165,7 +162,9 @@ func (s *PinotIntegrationSuite) TestListOpenWorkflow() { request.SearchAttributes = searchAttr startTime := time.Now().UnixNano() - we, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) startFilter := &types.StartTimeFilter{} @@ -173,7 +172,7 @@ func (s *PinotIntegrationSuite) TestListOpenWorkflow() { var openExecution *types.WorkflowExecutionInfo for i := 0; i < numberOfRetry; i++ { startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano()) - resp, err := s.engine.ListOpenWorkflowExecutions(createContext(), &types.ListOpenWorkflowExecutionsRequest{ + resp, err := s.engine.ListOpenWorkflowExecutions(ctx, &types.ListOpenWorkflowExecutionsRequest{ Domain: s.domainName, MaximumPageSize: defaultTestValueOfESIndexMaxResultWindow, StartTimeFilter: startFilter, @@ -199,8 +198,9 @@ func (s *PinotIntegrationSuite) TestListWorkflow() { wt := "pinot-integration-list-workflow-test-type" tl := "pinot-integration-list-workflow-test-tasklist" request := s.createStartWorkflowExecutionRequest(id, wt, tl) - - we, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s"`, id) s.testHelperForReadOnce(we.GetRunID(), query, false, false) @@ -239,15 +239,16 @@ func (s *PinotIntegrationSuite) testHelperForReadOnceWithDomain(domainName strin PageSize: defaultTestValueOfESIndexMaxResultWindow, Query: query, } + ctx, cancel := createContext() + defer cancel() Retry: for i := 0; i < numberOfRetry; i++ { var resp *types.ListWorkflowExecutionsResponse var err error - if isScan { - resp, err = s.engine.ScanWorkflowExecutions(createContext(), listRequest) + resp, err = s.engine.ScanWorkflowExecutions(ctx, listRequest) } else { - resp, err = s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err = s.engine.ListWorkflowExecutions(ctx, listRequest) } s.Nil(err) @@ -296,8 +297,9 @@ func (s *PinotIntegrationSuite) startWorkflow( if is_cron { request.CronSchedule = "*/5 * * * *" // every 5 minutes } - - we, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s"`, id) @@ -328,15 +330,16 @@ func (s *PinotIntegrationSuite) TestListWorkflow_ExecutionTime() { wt := "pinot-integration-list-workflow-execution-time-test-type" tl := "pinot-integration-list-workflow-execution-time-test-tasklist" request := s.createStartWorkflowExecutionRequest(id, wt, tl) - - we, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) cronID := id + "-cron" request.CronSchedule = "@every 1m" request.WorkflowID = cronID - weCron, err := s.engine.StartWorkflowExecution(createContext(), request) + weCron, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`(WorkflowID = '%s' or WorkflowID = '%s') and ExecutionTime < %v and ExecutionTime > 0`, id, cronID, time.Now().UnixNano()+int64(time.Minute)) @@ -359,8 +362,9 @@ func (s *PinotIntegrationSuite) TestListWorkflow_SearchAttribute() { }, } request.SearchAttributes = searchAttr - - we, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal) s.testHelperForReadOnce(we.GetRunID(), query, false, false) @@ -418,7 +422,8 @@ func (s *PinotIntegrationSuite) TestListWorkflow_SearchAttribute() { WorkflowID: id, }, } - descResp, err := s.engine.DescribeWorkflowExecution(createContext(), descRequest) + + descResp, err := s.engine.DescribeWorkflowExecution(ctx, descRequest) s.Nil(err) expectedSearchAttributes := getPinotUpsertSearchAttributes() s.Equal(expectedSearchAttributes, descResp.WorkflowExecutionInfo.GetSearchAttributes()) @@ -453,7 +458,8 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() { wt := "pinot-integration-list-workflow-or-query-test-type" tl := "pinot-integration-list-workflow-or-query-test-tasklist" request := s.createStartWorkflowExecutionRequest(id, wt, tl) - + ctx, cancel := createContext() + defer cancel() // start 3 workflows key := definition.CustomIntField attrValBytes, _ := json.Marshal(1) @@ -463,21 +469,21 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() { }, } request.SearchAttributes = searchAttr - we1, err := s.engine.StartWorkflowExecution(createContext(), request) + we1, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) request.RequestID = uuid.New() request.WorkflowID = id + "-2" attrValBytes, _ = json.Marshal(2) searchAttr.IndexedFields[key] = attrValBytes - we2, err := s.engine.StartWorkflowExecution(createContext(), request) + we2, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) request.RequestID = uuid.New() request.WorkflowID = id + "-3" attrValBytes, _ = json.Marshal(3) searchAttr.IndexedFields[key] = attrValBytes - we3, err := s.engine.StartWorkflowExecution(createContext(), request) + we3, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) time.Sleep(waitForPinotToSettle) @@ -491,7 +497,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() { Query: query1, } for i := 0; i < numberOfRetry; i++ { - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) if len(resp.GetExecutions()) == 1 { openExecution = resp.GetExecutions()[0] @@ -512,7 +518,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() { listRequest.Query = query2 var openExecutions []*types.WorkflowExecutionInfo for i := 0; i < numberOfRetry; i++ { - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) if len(resp.GetExecutions()) == 2 { openExecutions = resp.GetExecutions() @@ -539,7 +545,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() { query3 := fmt.Sprintf(`(CustomIntField = %d or CustomIntField = %d) and CloseTime = missing`, 2, 3) listRequest.Query = query3 for i := 0; i < numberOfRetry; i++ { - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) if len(resp.GetExecutions()) == 2 { openExecutions = resp.GetExecutions() @@ -563,11 +569,12 @@ func (s *PinotIntegrationSuite) TestListWorkflow_MaxWindowSize() { wt := "pinot-integration-list-workflow-max-window-size-test-type" tl := "pinot-integration-list-workflow-max-window-size-test-tasklist" startRequest := s.createStartWorkflowExecutionRequest(id, wt, tl) - + ctx, cancel := createContext() + defer cancel() for i := 0; i < defaultTestValueOfESIndexMaxResultWindow; i++ { startRequest.RequestID = uuid.New() startRequest.WorkflowID = id + strconv.Itoa(i) - _, err := s.engine.StartWorkflowExecution(createContext(), startRequest) + _, err := s.engine.StartWorkflowExecution(ctx, startRequest) s.Nil(err) } @@ -584,7 +591,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_MaxWindowSize() { } // get first page for i := 0; i < numberOfRetry; i++ { - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) if len(resp.GetExecutions()) == defaultTestValueOfESIndexMaxResultWindow { listResp = resp @@ -597,7 +604,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_MaxWindowSize() { // the last request listRequest.NextPageToken = listResp.GetNextPageToken() - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) s.True(len(resp.GetExecutions()) == 0) s.True(len(resp.GetNextPageToken()) == 0) @@ -608,7 +615,8 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() { wt := "pinot-integration-list-workflow-order-by-test-type" tl := "pinot-integration-list-workflow-order-by-test-tasklist" startRequest := s.createStartWorkflowExecutionRequest(id, wt, tl) - + ctx, cancel := createContext() + defer cancel() for i := 0; i < defaultTestValueOfESIndexMaxResultWindow+1; i++ { // start 6 startRequest.RequestID = uuid.New() startRequest.WorkflowID = id + strconv.Itoa(i) @@ -631,7 +639,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() { startRequest.SearchAttributes = &types.SearchAttributes{} } - _, err := s.engine.StartWorkflowExecution(createContext(), startRequest) + _, err := s.engine.StartWorkflowExecution(ctx, startRequest) s.Nil(err) } @@ -651,7 +659,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() { Query: query1, } for i := 0; i < numberOfRetry; i++ { - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) if int32(len(resp.GetExecutions())) == listRequest.GetPageSize() { openExecutions = resp.GetExecutions() @@ -740,12 +748,13 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() { func (s *PinotIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize int, startRequest *types.StartWorkflowExecutionRequest, wid, wType string, isScan bool) { - + ctx, cancel := createContext() + defer cancel() // start enough number of workflows for i := 0; i < numOfWorkflows; i++ { startRequest.RequestID = uuid.New() startRequest.WorkflowID = wid + strconv.Itoa(i) - _, err := s.engine.StartWorkflowExecution(createContext(), startRequest) + _, err := s.engine.StartWorkflowExecution(ctx, startRequest) s.Nil(err) } @@ -766,9 +775,9 @@ func (s *PinotIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize var err error if isScan { - resp, err = s.engine.ScanWorkflowExecutions(createContext(), listRequest) + resp, err = s.engine.ScanWorkflowExecutions(ctx, listRequest) } else { - resp, err = s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err = s.engine.ListWorkflowExecutions(ctx, listRequest) } s.Nil(err) if len(resp.GetExecutions()) == pageSize { @@ -791,9 +800,9 @@ func (s *PinotIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize var err error if isScan { - resp, err = s.engine.ScanWorkflowExecutions(createContext(), listRequest) + resp, err = s.engine.ScanWorkflowExecutions(ctx, listRequest) } else { - resp, err = s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err = s.engine.ListWorkflowExecutions(ctx, listRequest) } s.Nil(err) @@ -836,8 +845,9 @@ func (s *PinotIntegrationSuite) TestScanWorkflow() { TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), Identity: identity, } - - we, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s"`, id) s.testHelperForReadOnce(we.GetRunID(), query, true, false) @@ -856,8 +866,9 @@ func (s *PinotIntegrationSuite) TestScanWorkflow_SearchAttribute() { }, } request.SearchAttributes = searchAttr - - we, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal) s.testHelperForReadOnce(we.GetRunID(), query, true, false) @@ -904,8 +915,9 @@ func (s *PinotIntegrationSuite) TestCountWorkflow() { }, } request.SearchAttributes = searchAttr - - _, err := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + _, err := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err) query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal) @@ -915,7 +927,7 @@ func (s *PinotIntegrationSuite) TestCountWorkflow() { } var resp *types.CountWorkflowExecutionsResponse for i := 0; i < numberOfRetry; i++ { - resp, err = s.engine.CountWorkflowExecutions(createContext(), countRequest) + resp, err = s.engine.CountWorkflowExecutions(ctx, countRequest) s.Nil(err) if resp.GetCount() == int64(1) { break @@ -926,7 +938,7 @@ func (s *PinotIntegrationSuite) TestCountWorkflow() { query = fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, "noMatch") countRequest.Query = query - resp, err = s.engine.CountWorkflowExecutions(createContext(), countRequest) + resp, err = s.engine.CountWorkflowExecutions(ctx, countRequest) s.Nil(err) s.Equal(int64(0), resp.GetCount()) } @@ -954,8 +966,9 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution() { TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), Identity: identity, } - - we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err0 := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err0) s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunID)) @@ -1039,7 +1052,7 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution() { } verified := false for i := 0; i < numberOfRetry; i++ { - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) if len(resp.GetExecutions()) == 1 { execution := resp.GetExecutions()[0] @@ -1084,8 +1097,10 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution() { func (s *PinotIntegrationSuite) testListResultForUpsertSearchAttributes(listRequest *types.ListWorkflowExecutionsRequest) { verified := false + ctx, cancel := createContext() + defer cancel() for i := 0; i < numberOfRetry; i++ { - resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest) + resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest) s.Nil(err) //res2B, _ := json.Marshal(resp.GetExecutions()) @@ -1161,8 +1176,9 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey() { TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), Identity: identity, } - - we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + ctx, cancel := createContext() + defer cancel() + we, err0 := s.engine.StartWorkflowExecution(ctx, request) s.Nil(err0) s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunID)) @@ -1195,8 +1211,8 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey() { _, err := poller.PollAndProcessDecisionTask(false, false) s.Nil(err) - - historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &types.GetWorkflowExecutionHistoryRequest{ + defer cancel() + historyResponse, err := s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ Domain: s.domainName, Execution: &types.WorkflowExecution{ WorkflowID: id,