Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testcases to check upsert functionality #269

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Adds `Err()` function to Response for detailed errors ([#246](https://github.com/opensearch-project/opensearch-go/pull/246))
- Adds Point In Time API ([#253](https://github.com/opensearch-project/opensearch-go/pull/253))
- Adds InfoResp type ([#253](https://github.com/opensearch-project/opensearch-go/pull/253))
- Adds testcases to check upsert functionality ([#207](https://github.com/opensearch-project/opensearch-go/issues/207))

### Changed

Expand Down
336 changes: 214 additions & 122 deletions opensearchutil/bulk_indexer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build integration
// +build integration

package opensearchutil_test
Expand All @@ -34,7 +35,6 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -44,158 +44,250 @@ import (
)

func TestBulkIndexerIntegration(t *testing.T) {
body := `{"body":"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."}`
testRecordCount := uint64(10000)

testCases := []struct {
name string
CompressRequestBodyEnabled bool
compressRequestBodyEnabled bool
tests []struct {
name string
action string
body string
numItems uint64
numIndexed uint64
numCreated uint64
numUpdated uint64
numFailed uint64
}
}{
{
name: "Without body compression",
CompressRequestBodyEnabled: false,
name: "With body compression",
compressRequestBodyEnabled: true,
tests: []struct {
name string
action string
body string
numItems uint64
numIndexed uint64
numCreated uint64
numUpdated uint64
numFailed uint64
}{
{
name: "Index",
action: "index",
body: `{"title":"bar"}`,
numItems: testRecordCount,
numIndexed: testRecordCount,
numCreated: 0,
numUpdated: 0,
numFailed: 0,
},
{
name: "Upsert",
action: "update",
body: `{"doc":{"title":"qwe"}, "doc_as_upsert": true}`,
numItems: testRecordCount,
numIndexed: 0,
numCreated: 0,
numUpdated: testRecordCount,
numFailed: 0,
},
{
name: "Create",
action: "create",
body: `{"title":"bar"}`,
numItems: testRecordCount,
numIndexed: 0,
numCreated: 0,
numUpdated: 0,
numFailed: testRecordCount,
},
},
},
{
name: "With body compression",
CompressRequestBodyEnabled: true,
name: "Without body compression",
compressRequestBodyEnabled: false,
tests: []struct {
name string
action string
body string
numItems uint64
numIndexed uint64
numCreated uint64
numUpdated uint64
numFailed uint64
}{
{
name: "Index",
action: "index",
body: `{"title":"bar"}`,
numItems: testRecordCount,
numIndexed: testRecordCount,
numCreated: 0,
numUpdated: 0,
numFailed: 0,
},
{
name: "Upsert",
action: "update",
body: `{"doc":{"title":"qwe"}, "doc_as_upsert": true}`,
numItems: testRecordCount,
numIndexed: 0,
numCreated: 0,
numUpdated: testRecordCount,
numFailed: 0,
},
{
name: "Create",
action: "create",
body: `{"title":"bar"}`,
numItems: testRecordCount,
numIndexed: 0,
numCreated: 0,
numUpdated: 0,
numFailed: testRecordCount,
},
},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
t.Run("Default", func(t *testing.T) {
var countSuccessful uint64
indexName := "test-bulk-integration"

client, _ := opensearch.NewClient(opensearch.Config{
CompressRequestBody: tt.CompressRequestBodyEnabled,
Logger: &opensearchtransport.ColorLogger{Output: os.Stdout},
})
for _, c := range testCases {
indexName := "test-bulk-integration"

client.Indices.Delete([]string{indexName}, client.Indices.Delete.WithIgnoreUnavailable(true))
client.Indices.Create(
indexName,
client.Indices.Create.WithBody(strings.NewReader(`{"settings": {"number_of_shards": 1, "number_of_replicas": 0, "refresh_interval":"5s"}}`)),
client.Indices.Create.WithWaitForActiveShards("1"))
client, _ := opensearch.NewClient(opensearch.Config{
CompressRequestBody: c.compressRequestBodyEnabled,
Logger: &opensearchtransport.ColorLogger{Output: os.Stdout},
})

bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: indexName,
Client: client,
// FlushBytes: 3e+6,
})
client.Indices.Delete([]string{indexName}, client.Indices.Delete.WithIgnoreUnavailable(true))
client.Indices.Create(
indexName,
client.Indices.Create.WithBody(strings.NewReader(`{"settings": {"number_of_shards": 1, "number_of_replicas": 0, "refresh_interval":"5s"}}`)),
client.Indices.Create.WithWaitForActiveShards("1"))

numItems := 100000
start := time.Now().UTC()

for i := 1; i <= numItems; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(i),
Body: strings.NewReader(body),
OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)
},
for _, tt := range c.tests {
t.Run(tt.name, func(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: indexName,
Client: client,
ErrorTrace: true,
Human: true,
Pretty: true,
// FlushBytes: 3e+6,
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}

if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}
start := time.Now().UTC()

stats := bi.Stats()
for i := 1; i <= int(tt.numItems); i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Index: indexName,
Action: tt.action,
DocumentID: strconv.Itoa(i),
Body: strings.NewReader(tt.body),
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}

if stats.NumAdded != uint64(numItems) {
t.Errorf("Unexpected NumAdded: want=%d, got=%d", numItems, stats.NumAdded)
}
if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}

if stats.NumIndexed != uint64(numItems) {
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", numItems, stats.NumIndexed)
}
stats := bi.Stats()

if stats.NumFailed != 0 {
t.Errorf("Unexpected NumFailed: want=0, got=%d", stats.NumFailed)
}
if stats.NumAdded != tt.numItems {
t.Errorf("Unexpected NumAdded: want=%d, got=%d", tt.numItems, stats.NumAdded)
}

if countSuccessful != uint64(numItems) {
t.Errorf("Unexpected countSuccessful: want=%d, got=%d", numItems, countSuccessful)
}
if stats.NumIndexed != tt.numIndexed {
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", tt.numItems, stats.NumIndexed)
}

fmt.Printf(" Added %d documents to indexer. Succeeded: %d. Failed: %d. Requests: %d. Duration: %s (%.0f docs/sec)\n",
stats.NumAdded,
stats.NumFlushed,
stats.NumFailed,
stats.NumRequests,
time.Since(start).Truncate(time.Millisecond),
1000.0/float64(time.Since(start)/time.Millisecond)*float64(stats.NumFlushed))
})
if stats.NumUpdated != tt.numUpdated {
t.Errorf("Unexpected NumUpdated: want=%d, got=%d", tt.numUpdated, stats.NumUpdated)
}

t.Run("Multiple indices", func(t *testing.T) {
client, _ := opensearch.NewClient(opensearch.Config{
CompressRequestBody: tt.CompressRequestBodyEnabled,
Logger: &opensearchtransport.ColorLogger{Output: os.Stdout},
})
if stats.NumCreated != tt.numCreated {
t.Errorf("Unexpected NumCreated: want=%d, got=%d", tt.numCreated, stats.NumCreated)
}

bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: "test-index-a",
Client: client,
if stats.NumFailed != tt.numFailed {
t.Errorf("Unexpected NumFailed: want=0, got=%d", stats.NumFailed)
}

fmt.Printf(" Added %d documents to indexer. Succeeded: %d. Failed: %d. Requests: %d. Duration: %s (%.0f docs/sec)\n",
stats.NumAdded,
stats.NumFlushed,
stats.NumFailed,
stats.NumRequests,
time.Since(start).Truncate(time.Millisecond),
1000.0/float64(time.Since(start)/time.Millisecond)*float64(stats.NumFlushed))
})

// Default index
for i := 1; i <= 10; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(i),
Body: strings.NewReader(body),
t.Run("Multiple indices", func(t *testing.T) {
bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: "test-index-a",
Client: client,
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)

// Default index
for i := 1; i <= 10; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
DocumentID: strconv.Itoa(i),
Body: strings.NewReader(tt.body),
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}
}

// Index 1
for i := 1; i <= 10; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
Index: "test-index-b",
Body: strings.NewReader(body),
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)

// Index 1
for i := 1; i <= 10; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
Index: "test-index-b",
Body: strings.NewReader(tt.body),
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}
}

// Index 2
for i := 1; i <= 10; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
Index: "test-index-c",
Body: strings.NewReader(body),
})

// Index 2
for i := 1; i <= 10; i++ {
err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{
Action: "index",
Index: "test-index-c",
Body: strings.NewReader(tt.body),
})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}

if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}
stats := bi.Stats()

expectedIndexed := 10 + 10 + 10
if stats.NumIndexed != uint64(expectedIndexed) {
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", expectedIndexed, stats.NumIndexed)
}

res, err := client.Indices.Exists([]string{"test-index-a", "test-index-b", "test-index-c"})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}

if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}
stats := bi.Stats()

expectedIndexed := 10 + 10 + 10
if stats.NumIndexed != uint64(expectedIndexed) {
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", expectedIndexed, stats.NumIndexed)
}

res, err := client.Indices.Exists([]string{"test-index-a", "test-index-b", "test-index-c"})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if res.StatusCode != 200 {
t.Errorf("Expected indices to exist, but got a [%s] response", res.Status())
}
if res.StatusCode != 200 {
t.Errorf("Expected indices to exist, but got a [%s] response", res.Status())
}
})
})
})
}
}
}