Skip to content

Commit

Permalink
Fix context leak in pinot integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
neil-xie committed Feb 22, 2024
1 parent d7479ca commit 5a06b73
Showing 1 changed file with 70 additions and 54 deletions.
124 changes: 70 additions & 54 deletions host/pinot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:build pinotintegration
// +build pinotintegration

// to run locally, make sure kafka and pinot is running,
// then run cmd `go test -v ./host -run TestPinotIntegrationSuite -tags pinotintegration`
// currently we have to manually add test table and delete the table for cleaning
Expand Down Expand Up @@ -165,15 +162,17 @@ func (s *PinotIntegrationSuite) TestListOpenWorkflow() {
request.SearchAttributes = searchAttr

startTime := time.Now().UnixNano()
we, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

startFilter := &types.StartTimeFilter{}
startFilter.EarliestTime = common.Int64Ptr(startTime)
var openExecution *types.WorkflowExecutionInfo
for i := 0; i < numberOfRetry; i++ {
startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano())
resp, err := s.engine.ListOpenWorkflowExecutions(createContext(), &types.ListOpenWorkflowExecutionsRequest{
resp, err := s.engine.ListOpenWorkflowExecutions(ctx, &types.ListOpenWorkflowExecutionsRequest{
Domain: s.domainName,
MaximumPageSize: defaultTestValueOfESIndexMaxResultWindow,
StartTimeFilter: startFilter,
Expand All @@ -199,8 +198,9 @@ func (s *PinotIntegrationSuite) TestListWorkflow() {
wt := "pinot-integration-list-workflow-test-type"
tl := "pinot-integration-list-workflow-test-tasklist"
request := s.createStartWorkflowExecutionRequest(id, wt, tl)

we, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)
query := fmt.Sprintf(`WorkflowID = "%s"`, id)
s.testHelperForReadOnce(we.GetRunID(), query, false, false)
Expand Down Expand Up @@ -239,15 +239,16 @@ func (s *PinotIntegrationSuite) testHelperForReadOnceWithDomain(domainName strin
PageSize: defaultTestValueOfESIndexMaxResultWindow,
Query: query,
}
ctx, cancel := createContext()
defer cancel()
Retry:
for i := 0; i < numberOfRetry; i++ {
var resp *types.ListWorkflowExecutionsResponse
var err error

if isScan {
resp, err = s.engine.ScanWorkflowExecutions(createContext(), listRequest)
resp, err = s.engine.ScanWorkflowExecutions(ctx, listRequest)
} else {
resp, err = s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err = s.engine.ListWorkflowExecutions(ctx, listRequest)
}

s.Nil(err)
Expand Down Expand Up @@ -296,8 +297,9 @@ func (s *PinotIntegrationSuite) startWorkflow(
if is_cron {
request.CronSchedule = "*/5 * * * *" // every 5 minutes
}

we, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

query := fmt.Sprintf(`WorkflowID = "%s"`, id)
Expand Down Expand Up @@ -328,15 +330,16 @@ func (s *PinotIntegrationSuite) TestListWorkflow_ExecutionTime() {
wt := "pinot-integration-list-workflow-execution-time-test-type"
tl := "pinot-integration-list-workflow-execution-time-test-tasklist"
request := s.createStartWorkflowExecutionRequest(id, wt, tl)

we, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

cronID := id + "-cron"
request.CronSchedule = "@every 1m"
request.WorkflowID = cronID

weCron, err := s.engine.StartWorkflowExecution(createContext(), request)
weCron, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

query := fmt.Sprintf(`(WorkflowID = '%s' or WorkflowID = '%s') and ExecutionTime < %v and ExecutionTime > 0`, id, cronID, time.Now().UnixNano()+int64(time.Minute))
Expand All @@ -359,8 +362,9 @@ func (s *PinotIntegrationSuite) TestListWorkflow_SearchAttribute() {
},
}
request.SearchAttributes = searchAttr

we, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)
query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)
s.testHelperForReadOnce(we.GetRunID(), query, false, false)
Expand Down Expand Up @@ -418,7 +422,8 @@ func (s *PinotIntegrationSuite) TestListWorkflow_SearchAttribute() {
WorkflowID: id,
},
}
descResp, err := s.engine.DescribeWorkflowExecution(createContext(), descRequest)

descResp, err := s.engine.DescribeWorkflowExecution(ctx, descRequest)
s.Nil(err)
expectedSearchAttributes := getPinotUpsertSearchAttributes()
s.Equal(expectedSearchAttributes, descResp.WorkflowExecutionInfo.GetSearchAttributes())
Expand Down Expand Up @@ -453,7 +458,8 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() {
wt := "pinot-integration-list-workflow-or-query-test-type"
tl := "pinot-integration-list-workflow-or-query-test-tasklist"
request := s.createStartWorkflowExecutionRequest(id, wt, tl)

ctx, cancel := createContext()
defer cancel()
// start 3 workflows
key := definition.CustomIntField
attrValBytes, _ := json.Marshal(1)
Expand All @@ -463,21 +469,21 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() {
},
}
request.SearchAttributes = searchAttr
we1, err := s.engine.StartWorkflowExecution(createContext(), request)
we1, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

request.RequestID = uuid.New()
request.WorkflowID = id + "-2"
attrValBytes, _ = json.Marshal(2)
searchAttr.IndexedFields[key] = attrValBytes
we2, err := s.engine.StartWorkflowExecution(createContext(), request)
we2, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

request.RequestID = uuid.New()
request.WorkflowID = id + "-3"
attrValBytes, _ = json.Marshal(3)
searchAttr.IndexedFields[key] = attrValBytes
we3, err := s.engine.StartWorkflowExecution(createContext(), request)
we3, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

time.Sleep(waitForPinotToSettle)
Expand All @@ -491,7 +497,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() {
Query: query1,
}
for i := 0; i < numberOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)
if len(resp.GetExecutions()) == 1 {
openExecution = resp.GetExecutions()[0]
Expand All @@ -512,7 +518,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() {
listRequest.Query = query2
var openExecutions []*types.WorkflowExecutionInfo
for i := 0; i < numberOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)
if len(resp.GetExecutions()) == 2 {
openExecutions = resp.GetExecutions()
Expand All @@ -539,7 +545,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrQuery() {
query3 := fmt.Sprintf(`(CustomIntField = %d or CustomIntField = %d) and CloseTime = missing`, 2, 3)
listRequest.Query = query3
for i := 0; i < numberOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)
if len(resp.GetExecutions()) == 2 {
openExecutions = resp.GetExecutions()
Expand All @@ -563,11 +569,12 @@ func (s *PinotIntegrationSuite) TestListWorkflow_MaxWindowSize() {
wt := "pinot-integration-list-workflow-max-window-size-test-type"
tl := "pinot-integration-list-workflow-max-window-size-test-tasklist"
startRequest := s.createStartWorkflowExecutionRequest(id, wt, tl)

ctx, cancel := createContext()
defer cancel()
for i := 0; i < defaultTestValueOfESIndexMaxResultWindow; i++ {
startRequest.RequestID = uuid.New()
startRequest.WorkflowID = id + strconv.Itoa(i)
_, err := s.engine.StartWorkflowExecution(createContext(), startRequest)
_, err := s.engine.StartWorkflowExecution(ctx, startRequest)
s.Nil(err)
}

Expand All @@ -584,7 +591,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_MaxWindowSize() {
}
// get first page
for i := 0; i < numberOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)
if len(resp.GetExecutions()) == defaultTestValueOfESIndexMaxResultWindow {
listResp = resp
Expand All @@ -597,7 +604,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_MaxWindowSize() {

// the last request
listRequest.NextPageToken = listResp.GetNextPageToken()
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)
s.True(len(resp.GetExecutions()) == 0)
s.True(len(resp.GetNextPageToken()) == 0)
Expand All @@ -608,7 +615,8 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() {
wt := "pinot-integration-list-workflow-order-by-test-type"
tl := "pinot-integration-list-workflow-order-by-test-tasklist"
startRequest := s.createStartWorkflowExecutionRequest(id, wt, tl)

ctx, cancel := createContext()
defer cancel()
for i := 0; i < defaultTestValueOfESIndexMaxResultWindow+1; i++ { // start 6
startRequest.RequestID = uuid.New()
startRequest.WorkflowID = id + strconv.Itoa(i)
Expand All @@ -631,7 +639,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() {
startRequest.SearchAttributes = &types.SearchAttributes{}
}

_, err := s.engine.StartWorkflowExecution(createContext(), startRequest)
_, err := s.engine.StartWorkflowExecution(ctx, startRequest)
s.Nil(err)
}

Expand All @@ -651,7 +659,7 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() {
Query: query1,
}
for i := 0; i < numberOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)
if int32(len(resp.GetExecutions())) == listRequest.GetPageSize() {
openExecutions = resp.GetExecutions()
Expand Down Expand Up @@ -740,12 +748,13 @@ func (s *PinotIntegrationSuite) TestListWorkflow_OrderBy() {

func (s *PinotIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize int,
startRequest *types.StartWorkflowExecutionRequest, wid, wType string, isScan bool) {

ctx, cancel := createContext()
defer cancel()
// start enough number of workflows
for i := 0; i < numOfWorkflows; i++ {
startRequest.RequestID = uuid.New()
startRequest.WorkflowID = wid + strconv.Itoa(i)
_, err := s.engine.StartWorkflowExecution(createContext(), startRequest)
_, err := s.engine.StartWorkflowExecution(ctx, startRequest)
s.Nil(err)
}

Expand All @@ -766,9 +775,9 @@ func (s *PinotIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize
var err error

if isScan {
resp, err = s.engine.ScanWorkflowExecutions(createContext(), listRequest)
resp, err = s.engine.ScanWorkflowExecutions(ctx, listRequest)
} else {
resp, err = s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err = s.engine.ListWorkflowExecutions(ctx, listRequest)
}
s.Nil(err)
if len(resp.GetExecutions()) == pageSize {
Expand All @@ -791,9 +800,9 @@ func (s *PinotIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize
var err error

if isScan {
resp, err = s.engine.ScanWorkflowExecutions(createContext(), listRequest)
resp, err = s.engine.ScanWorkflowExecutions(ctx, listRequest)
} else {
resp, err = s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err = s.engine.ListWorkflowExecutions(ctx, listRequest)
}
s.Nil(err)

Expand Down Expand Up @@ -836,8 +845,9 @@ func (s *PinotIntegrationSuite) TestScanWorkflow() {
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: identity,
}

we, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)
query := fmt.Sprintf(`WorkflowID = "%s"`, id)
s.testHelperForReadOnce(we.GetRunID(), query, true, false)
Expand All @@ -856,8 +866,9 @@ func (s *PinotIntegrationSuite) TestScanWorkflow_SearchAttribute() {
},
}
request.SearchAttributes = searchAttr

we, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)
query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)
s.testHelperForReadOnce(we.GetRunID(), query, true, false)
Expand Down Expand Up @@ -904,8 +915,9 @@ func (s *PinotIntegrationSuite) TestCountWorkflow() {
},
}
request.SearchAttributes = searchAttr

_, err := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
_, err := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err)

query := fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal)
Expand All @@ -915,7 +927,7 @@ func (s *PinotIntegrationSuite) TestCountWorkflow() {
}
var resp *types.CountWorkflowExecutionsResponse
for i := 0; i < numberOfRetry; i++ {
resp, err = s.engine.CountWorkflowExecutions(createContext(), countRequest)
resp, err = s.engine.CountWorkflowExecutions(ctx, countRequest)
s.Nil(err)
if resp.GetCount() == int64(1) {
break
Expand All @@ -926,7 +938,7 @@ func (s *PinotIntegrationSuite) TestCountWorkflow() {

query = fmt.Sprintf(`WorkflowID = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, "noMatch")
countRequest.Query = query
resp, err = s.engine.CountWorkflowExecutions(createContext(), countRequest)
resp, err = s.engine.CountWorkflowExecutions(ctx, countRequest)
s.Nil(err)
s.Equal(int64(0), resp.GetCount())
}
Expand Down Expand Up @@ -954,8 +966,9 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution() {
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: identity,
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err0 := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunID))
Expand Down Expand Up @@ -1039,7 +1052,7 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution() {
}
verified := false
for i := 0; i < numberOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)
if len(resp.GetExecutions()) == 1 {
execution := resp.GetExecutions()[0]
Expand Down Expand Up @@ -1084,8 +1097,10 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution() {

func (s *PinotIntegrationSuite) testListResultForUpsertSearchAttributes(listRequest *types.ListWorkflowExecutionsRequest) {
verified := false
ctx, cancel := createContext()
defer cancel()
for i := 0; i < numberOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
resp, err := s.engine.ListWorkflowExecutions(ctx, listRequest)
s.Nil(err)

//res2B, _ := json.Marshal(resp.GetExecutions())
Expand Down Expand Up @@ -1161,8 +1176,9 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey() {
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: identity,
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
ctx, cancel := createContext()
defer cancel()
we, err0 := s.engine.StartWorkflowExecution(ctx, request)
s.Nil(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunID))
Expand Down Expand Up @@ -1195,8 +1211,8 @@ func (s *PinotIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey() {

_, err := poller.PollAndProcessDecisionTask(false, false)
s.Nil(err)

historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &types.GetWorkflowExecutionHistoryRequest{
defer cancel()
historyResponse, err := s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Domain: s.domainName,
Execution: &types.WorkflowExecution{
WorkflowID: id,
Expand Down

0 comments on commit 5a06b73

Please sign in to comment.