Skip to content

Commit

Permalink
Add tests for es v6 bulk processor (cadence-workflow#5758)
Browse files Browse the repository at this point in the history
* Add tests for es v6 bulk processor
  • Loading branch information
neil-xie authored Mar 7, 2024
1 parent 856145b commit d1e00cb
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 2 deletions.
3 changes: 1 addition & 2 deletions common/elasticsearch/client/v6/client_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,11 @@ func (c *ElasticV6) RunBulkProcessor(ctx context.Context, parameters *bulk.BulkP
}

afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
gerr := convertV6ErrorToGenericError(err)
parameters.AfterFunc(
executionId,
fromV6ToGenericBulkableRequests(requests),
fromV6ToGenericBulkResponse(response),
gerr)
convertV6ErrorToGenericError(err))
}

return c.runBulkProcessor(ctx, &bulkProcessorParametersV6{
Expand Down
137 changes: 137 additions & 0 deletions common/elasticsearch/client/v6/client_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
package v6

import (
"context"
"errors"
"testing"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/elasticsearch/bulk"
"github.com/uber/cadence/common/log/testlogger"
)

func TestConvertV6ErrorToGenericError(t *testing.T) {
Expand Down Expand Up @@ -288,3 +291,137 @@ func TestFromV6ToGenericBulkableRequests(t *testing.T) {
genericRequests := fromV6ToGenericBulkableRequests(mockRequests)
assert.Len(t, genericRequests, len(mockRequests))
}

func getV6BulkProcessor(t *testing.T) *v6BulkProcessor {
return &v6BulkProcessor{
processor: &elastic.BulkProcessor{},
logger: testlogger.New(t),
}
}

func TestProcessorFunc(t *testing.T) {
processor := getV6BulkProcessor(t)
err := processor.Start(context.Background())
assert.NoError(t, err)

err = processor.Flush()
assert.NoError(t, err)

err = processor.Stop()
assert.NoError(t, err)

err = processor.Close()
assert.NoError(t, err)
}

func TestProcessorAdd(t *testing.T) {
tests := []struct {
name string
request *bulk.GenericBulkableAddRequest
expectErr bool
}{
{
name: "bulk add nil case",
request: nil,
expectErr: true,
},
{
name: "bulk add normal case",
request: &bulk.GenericBulkableAddRequest{
Index: "test-index",
RequestType: bulk.BulkableIndexRequest,
ID: "test-id",
VersionType: "internal",
Type: "test-type",
Version: int64(1),
Doc: "",
},
expectErr: false,
},
{
name: "bulk create normal case",
request: &bulk.GenericBulkableAddRequest{
Index: "test-index",
RequestType: bulk.BulkableCreateRequest,
ID: "test-id",
VersionType: "internal",
Type: "test-type",
Version: int64(1),
Doc: "",
},
expectErr: false,
},
{
name: "bulk add normal case",
request: &bulk.GenericBulkableAddRequest{
Index: "test-index",
RequestType: bulk.BulkableDeleteRequest,
ID: "test-id",
VersionType: "internal",
Type: "test-type",
Version: int64(1),
Doc: "",
},
expectErr: false,
},
}

elasticV6, testServer := getMockClient(t, nil)
defer testServer.Close()
svc := elasticV6.client.BulkProcessor().
Name("FlushInterval-1").
Workers(2).
BulkActions(-1).
BulkSize(-1)

p, err := svc.Do(context.Background())
if err != nil {
t.Fatal(err)
}

processor := v6BulkProcessor{
processor: p,
logger: testlogger.New(t),
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.expectErr {
assert.Panics(t, func() { processor.Add(tt.request) }, "The method should panic")
} else {
assert.NotPanics(t, func() { processor.Add(tt.request) }, "The method should not panic")
}
})
}
}

func TestRunBulkProcessor(t *testing.T) {
tests := []struct {
name string
params *bulk.BulkProcessorParameters
}{
{
name: "successful initialization",
params: &bulk.BulkProcessorParameters{
Name: "test-processor",
NumOfWorkers: 5,
BulkActions: 1000,
BulkSize: 2 * 1024 * 1024, // 2MB
FlushInterval: 30 * time.Second,
Backoff: elastic.NewExponentialBackoff(10*time.Millisecond, 8*time.Second),
BeforeFunc: func(executionId int64, requests []bulk.GenericBulkableRequest) {
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
elasticV6, testServer := getMockClient(t, nil)
defer testServer.Close()
result, err := elasticV6.RunBulkProcessor(context.Background(), tt.params)
assert.NoError(t, err)
assert.NotNil(t, result)
})
}
}

0 comments on commit d1e00cb

Please sign in to comment.