Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add emitWorkflowTypeCountMetricsPinot in ESAnalyzer #6177

Merged
merged 8 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/pinot/generic_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/pinot/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type (
// Search API is only for supporting various List[Open/Closed]WorkflowExecutions(ByXyz).
// Use SearchByQuery or ScanByQuery for generic purpose searching.
Search(request *SearchRequest) (*SearchResponse, error)
SearchAggr(request *SearchRequest) (AggrResponse, error)
// CountByQuery is for returning the count of workflow executions that match the query
CountByQuery(query string) (int64, error)
GetTableName() string
Expand All @@ -51,4 +52,5 @@ type (

// SearchResponse is a response to Search, SearchByQuery and ScanByQuery
SearchResponse = p.InternalListWorkflowExecutionsResponse
AggrResponse [][]interface{}
)
12 changes: 12 additions & 0 deletions common/pinot/pinot_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ func (c *PinotClient) Search(request *SearchRequest) (*SearchResponse, error) {
return c.getInternalListWorkflowExecutionsResponse(resp, request.Filter, token, request.ListRequest.PageSize, request.MaxResultWindow)
}

func (c *PinotClient) SearchAggr(request *SearchRequest) (AggrResponse, error) {
bowenxia marked this conversation as resolved.
Show resolved Hide resolved
resp, err := c.client.ExecuteSQL(c.tableName, request.Query)

if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("Pinot SearchAggr failed, %v", err),
}
}

return resp.ResultTable.Rows, nil
}

func (c *PinotClient) CountByQuery(query string) (int64, error) {
resp, err := c.client.ExecuteSQL(c.tableName, query)
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions common/pinot/pinot_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,76 @@ func TestCountByQuery(t *testing.T) {
}
}

func TestSearchAggr(t *testing.T) {
query := "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10"

request := &SearchRequest{
Query: query,
ListRequest: &p.InternalListWorkflowExecutionsRequest{
NextPageToken: nil,
},
}

errorRequest := &SearchRequest{
Query: "error",
ListRequest: &p.InternalListWorkflowExecutionsRequest{
NextPageToken: []byte("ha-ha"),
},
}

tests := map[string]struct {
inputRequest *SearchRequest
expectedError error
server *httptest.Server
}{
"Case1-1: error internal server case": {
inputRequest: errorRequest,
expectedError: &types.InternalServiceError{
Message: fmt.Sprintf("Pinot SearchAggr failed, caught http exception when querying Pinot: 400 Bad Request"),
},
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
})),
},
"Case2: normal case": {
inputRequest: request,
expectedError: nil,
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
assert.Equal(t, "POST", r.Method)
assert.True(t, strings.HasSuffix(r.RequestURI, "/query/sql"))
fmt.Fprintln(w, "{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"LONG\"],\"columnNames\":[\"cnt\"]},\"rows\":[[\"test-domain\", 10]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":0,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":5,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}")
})),
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
ts := test.server
defer ts.Close()
pinotConnection, err := pinot.NewFromBrokerList([]string{ts.URL})
assert.NotNil(t, pinotConnection)
assert.Nil(t, err)

pinotClient := NewPinotClient(pinotConnection, testlogger.New(t), &config.PinotVisibilityConfig{
Table: "",
ServiceName: "",
})

actualOutput, err := pinotClient.SearchAggr(test.inputRequest)

if test.expectedError != nil {
assert.Nil(t, actualOutput)
assert.Equal(t, test.expectedError.Error(), err.Error())
} else {
assert.NotNil(t, actualOutput)
assert.Nil(t, err)
}
})
}
}

func TestGetTableName(t *testing.T) {
assert.Equal(t, "", client.GetTableName())
}
Expand Down
3 changes: 3 additions & 0 deletions service/worker/esanalyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type (
logger log.Logger
tallyScope tally.Scope
visibilityIndexName string
pinotTableName string
resource resource.Resource
domainCache cache.DomainCache
config *Config
Expand Down Expand Up @@ -110,6 +111,7 @@ func New(
esClient es.GenericClient,
pinotClient pinot.GenericClient,
esConfig *config.ElasticSearchConfig,
pinotConfig *config.PinotVisibilityConfig,
logger log.Logger,
tallyScope tally.Scope,
resource resource.Resource,
Expand All @@ -133,6 +135,7 @@ func New(
logger: logger,
tallyScope: tallyScope,
visibilityIndexName: esConfig.Indices[common.VisibilityAppName],
pinotTableName: pinotConfig.Table,
resource: resource,
domainCache: domainCache,
config: config,
Expand Down
109 changes: 105 additions & 4 deletions service/worker/esanalyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,15 @@ func TestNewAnalyzer(t *testing.T) {
common.VisibilityAppName: "test",
},
}
mockPinotConfig := &config.PinotVisibilityConfig{
Table: "test",
}

mockESClient := &esMocks.GenericClient{}
testAnalyzer1 := New(nil, nil, nil, mockESClient, nil, mockESConfig, nil, nil, nil, nil, nil)
testAnalyzer1 := New(nil, nil, nil, mockESClient, nil, mockESConfig, mockPinotConfig, nil, nil, nil, nil, nil)

mockPinotClient := &pinot.MockGenericClient{}
testAnalyzer2 := New(nil, nil, nil, nil, mockPinotClient, mockESConfig, nil, nil, nil, nil, nil)
testAnalyzer2 := New(nil, nil, nil, nil, mockPinotClient, mockESConfig, mockPinotConfig, nil, nil, nil, nil, nil)

assert.Equal(t, testAnalyzer1.readMode, ES)
assert.Equal(t, testAnalyzer2.readMode, Pinot)
Expand All @@ -369,11 +372,14 @@ func TestEmitWorkflowTypeCountMetricsESErrorCases(t *testing.T) {
common.VisibilityAppName: "test",
},
}
mockPinotConfig := &config.PinotVisibilityConfig{
Table: "test",
}

ctrl := gomock.NewController(t)
mockESClient := &esMocks.GenericClient{}
mockDomainCache := cache.NewMockDomainCache(ctrl)
testAnalyzer := New(nil, nil, nil, mockESClient, nil, mockESConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil)
testAnalyzer := New(nil, nil, nil, mockESClient, nil, mockESConfig, mockPinotConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil)
testWorkflow := &Workflow{analyzer: testAnalyzer}

tests := map[string]struct {
Expand Down Expand Up @@ -451,11 +457,14 @@ func TestEmitWorkflowVersionMetricsESErrorCases(t *testing.T) {
common.VisibilityAppName: "test",
},
}
mockPinotConfig := &config.PinotVisibilityConfig{
Table: "test",
}

ctrl := gomock.NewController(t)
mockESClient := &esMocks.GenericClient{}
mockDomainCache := cache.NewMockDomainCache(ctrl)
testAnalyzer := New(nil, nil, nil, mockESClient, nil, mockESConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil)
testAnalyzer := New(nil, nil, nil, mockESClient, nil, mockESConfig, mockPinotConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil)
testWorkflow := &Workflow{analyzer: testAnalyzer}

tests := map[string]struct {
Expand Down Expand Up @@ -526,3 +535,95 @@ func TestEmitWorkflowVersionMetricsESErrorCases(t *testing.T) {
})
}
}

func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) {
mockPinotConfig := &config.PinotVisibilityConfig{
Table: "test",
}
mockESConfig := &config.ElasticSearchConfig{
Indices: map[string]string{
common.VisibilityAppName: "test",
},
}

ctrl := gomock.NewController(t)

mockPinotClient := pinot.NewMockGenericClient(ctrl)
mockDomainCache := cache.NewMockDomainCache(ctrl)
testAnalyzer := New(nil, nil, nil, nil, mockPinotClient, mockESConfig, mockPinotConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil)
testWorkflow := &Workflow{analyzer: testAnalyzer}

tests := map[string]struct {
domainCacheAffordance func(mockDomainCache *cache.MockDomainCache)
PinotClientAffordance func(mockPinotClient *pinot.MockGenericClient)
expectedErr error
}{
"Case0: success": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test0", 1},
{"test1", 2},
}, nil).Times(1)
},
expectedErr: nil,
},
"Case1: error getting domain": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("domain error")).Times(1)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {},
expectedErr: fmt.Errorf("domain error"),
},
"Case2: error Pinot query": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1)
},
expectedErr: fmt.Errorf("pinot error"),
},
"Case3: Aggregation is empty": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{}, nil).Times(1)
},
expectedErr: nil,
},
"Case4: error parsing workflow count": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test", "invalid"},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error parsing workflow count for workflow type test"),
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
// Set up mocks
test.domainCacheAffordance(mockDomainCache)
test.PinotClientAffordance(mockPinotClient)

err := testWorkflow.emitWorkflowTypeCountMetricsPinot("test-domain", zap.NewNop())
if err == nil {
assert.Equal(t, test.expectedErr, err)
} else {
assert.Equal(t, test.expectedErr.Error(), err.Error())
}
})
}
}
Loading
Loading