diff --git a/CHANGELOG.md b/CHANGELOG.md index bd082d175..fe4700bd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Adds `Err()` function to Response for detailed errors ([#246](https://github.com/opensearch-project/opensearch-go/pull/246)) - Adds Point In Time API ([#253](https://github.com/opensearch-project/opensearch-go/pull/253)) - Adds InfoResp type ([#253](https://github.com/opensearch-project/opensearch-go/pull/253)) +- Adds testcases to check upsert functionality ([#207](https://github.com/opensearch-project/opensearch-go/issues/207)) ### Changed diff --git a/opensearchutil/bulk_indexer_integration_test.go b/opensearchutil/bulk_indexer_integration_test.go index 3d8d00fa2..a07677af4 100644 --- a/opensearchutil/bulk_indexer_integration_test.go +++ b/opensearchutil/bulk_indexer_integration_test.go @@ -24,6 +24,7 @@ // specific language governing permissions and limitations // under the License. +//go:build integration // +build integration package opensearchutil_test @@ -34,7 +35,6 @@ import ( "os" "strconv" "strings" - "sync/atomic" "testing" "time" @@ -44,158 +44,250 @@ import ( ) func TestBulkIndexerIntegration(t *testing.T) { - body := `{"body":"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."}` + testRecordCount := uint64(10000) testCases := []struct { name string - CompressRequestBodyEnabled bool + compressRequestBodyEnabled bool + tests []struct { + name string + action string + body string + numItems uint64 + numIndexed uint64 + numCreated uint64 + numUpdated uint64 + numFailed uint64 + } }{ { - name: "Without body compression", - CompressRequestBodyEnabled: false, + name: "With body compression", + compressRequestBodyEnabled: true, + tests: []struct { + name string + action string + body string + numItems uint64 + numIndexed uint64 + numCreated uint64 + numUpdated uint64 + numFailed uint64 + }{ + { + name: "Index", + action: "index", + body: `{"title":"bar"}`, + numItems: testRecordCount, + numIndexed: testRecordCount, + numCreated: 0, + numUpdated: 0, + numFailed: 0, + }, + { + name: "Upsert", + action: "update", + body: `{"doc":{"title":"qwe"}, "doc_as_upsert": true}`, + numItems: testRecordCount, + numIndexed: 0, + numCreated: 0, + numUpdated: testRecordCount, + numFailed: 0, + }, + { + name: "Create", + action: "create", + body: `{"title":"bar"}`, + numItems: testRecordCount, + numIndexed: 0, + numCreated: 0, + numUpdated: 0, + numFailed: testRecordCount, + }, + }, }, { - name: "With body compression", - CompressRequestBodyEnabled: true, + name: "Without body compression", + compressRequestBodyEnabled: false, + tests: []struct { + name string + action string + body string + numItems uint64 + numIndexed uint64 + numCreated uint64 + numUpdated uint64 + numFailed uint64 + }{ + { + name: "Index", + action: "index", + body: `{"title":"bar"}`, + numItems: testRecordCount, + numIndexed: testRecordCount, + numCreated: 0, + numUpdated: 0, + numFailed: 0, + }, + { + name: "Upsert", + action: "update", + body: `{"doc":{"title":"qwe"}, "doc_as_upsert": true}`, + numItems: testRecordCount, + numIndexed: 0, + numCreated: 0, + numUpdated: testRecordCount, + numFailed: 0, + }, + { + name: "Create", + action: "create", + body: `{"title":"bar"}`, + numItems: testRecordCount, + numIndexed: 0, + numCreated: 0, + numUpdated: 0, + numFailed: testRecordCount, + }, + }, }, } - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - t.Run("Default", func(t *testing.T) { - var countSuccessful uint64 - indexName := "test-bulk-integration" - - client, _ := opensearch.NewClient(opensearch.Config{ - CompressRequestBody: tt.CompressRequestBodyEnabled, - Logger: &opensearchtransport.ColorLogger{Output: os.Stdout}, - }) + for _, c := range testCases { + indexName := "test-bulk-integration" - client.Indices.Delete([]string{indexName}, client.Indices.Delete.WithIgnoreUnavailable(true)) - client.Indices.Create( - indexName, - client.Indices.Create.WithBody(strings.NewReader(`{"settings": {"number_of_shards": 1, "number_of_replicas": 0, "refresh_interval":"5s"}}`)), - client.Indices.Create.WithWaitForActiveShards("1")) + client, _ := opensearch.NewClient(opensearch.Config{ + CompressRequestBody: c.compressRequestBodyEnabled, + Logger: &opensearchtransport.ColorLogger{Output: os.Stdout}, + }) - bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ - Index: indexName, - Client: client, - // FlushBytes: 3e+6, - }) + client.Indices.Delete([]string{indexName}, client.Indices.Delete.WithIgnoreUnavailable(true)) + client.Indices.Create( + indexName, + client.Indices.Create.WithBody(strings.NewReader(`{"settings": {"number_of_shards": 1, "number_of_replicas": 0, "refresh_interval":"5s"}}`)), + client.Indices.Create.WithWaitForActiveShards("1")) - numItems := 100000 - start := time.Now().UTC() - - for i := 1; i <= numItems; i++ { - err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ - Action: "index", - DocumentID: strconv.Itoa(i), - Body: strings.NewReader(body), - OnSuccess: func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) { - atomic.AddUint64(&countSuccessful, 1) - }, + for _, tt := range c.tests { + t.Run(tt.name, func(t *testing.T) { + t.Run(c.name, func(t *testing.T) { + bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ + Index: indexName, + Client: client, + ErrorTrace: true, + Human: true, + Pretty: true, + // FlushBytes: 3e+6, }) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - } - if err := bi.Close(context.Background()); err != nil { - t.Errorf("Unexpected error: %s", err) - } + start := time.Now().UTC() - stats := bi.Stats() + for i := 1; i <= int(tt.numItems); i++ { + err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ + Index: indexName, + Action: tt.action, + DocumentID: strconv.Itoa(i), + Body: strings.NewReader(tt.body), + }) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + } - if stats.NumAdded != uint64(numItems) { - t.Errorf("Unexpected NumAdded: want=%d, got=%d", numItems, stats.NumAdded) - } + if err := bi.Close(context.Background()); err != nil { + t.Errorf("Unexpected error: %s", err) + } - if stats.NumIndexed != uint64(numItems) { - t.Errorf("Unexpected NumIndexed: want=%d, got=%d", numItems, stats.NumIndexed) - } + stats := bi.Stats() - if stats.NumFailed != 0 { - t.Errorf("Unexpected NumFailed: want=0, got=%d", stats.NumFailed) - } + if stats.NumAdded != tt.numItems { + t.Errorf("Unexpected NumAdded: want=%d, got=%d", tt.numItems, stats.NumAdded) + } - if countSuccessful != uint64(numItems) { - t.Errorf("Unexpected countSuccessful: want=%d, got=%d", numItems, countSuccessful) - } + if stats.NumIndexed != tt.numIndexed { + t.Errorf("Unexpected NumIndexed: want=%d, got=%d", tt.numItems, stats.NumIndexed) + } - fmt.Printf(" Added %d documents to indexer. Succeeded: %d. Failed: %d. Requests: %d. Duration: %s (%.0f docs/sec)\n", - stats.NumAdded, - stats.NumFlushed, - stats.NumFailed, - stats.NumRequests, - time.Since(start).Truncate(time.Millisecond), - 1000.0/float64(time.Since(start)/time.Millisecond)*float64(stats.NumFlushed)) - }) + if stats.NumUpdated != tt.numUpdated { + t.Errorf("Unexpected NumUpdated: want=%d, got=%d", tt.numUpdated, stats.NumUpdated) + } - t.Run("Multiple indices", func(t *testing.T) { - client, _ := opensearch.NewClient(opensearch.Config{ - CompressRequestBody: tt.CompressRequestBodyEnabled, - Logger: &opensearchtransport.ColorLogger{Output: os.Stdout}, - }) + if stats.NumCreated != tt.numCreated { + t.Errorf("Unexpected NumCreated: want=%d, got=%d", tt.numCreated, stats.NumCreated) + } - bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ - Index: "test-index-a", - Client: client, + if stats.NumFailed != tt.numFailed { + t.Errorf("Unexpected NumFailed: want=0, got=%d", stats.NumFailed) + } + + fmt.Printf(" Added %d documents to indexer. Succeeded: %d. Failed: %d. Requests: %d. Duration: %s (%.0f docs/sec)\n", + stats.NumAdded, + stats.NumFlushed, + stats.NumFailed, + stats.NumRequests, + time.Since(start).Truncate(time.Millisecond), + 1000.0/float64(time.Since(start)/time.Millisecond)*float64(stats.NumFlushed)) }) - // Default index - for i := 1; i <= 10; i++ { - err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ - Action: "index", - DocumentID: strconv.Itoa(i), - Body: strings.NewReader(body), + t.Run("Multiple indices", func(t *testing.T) { + bi, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ + Index: "test-index-a", + Client: client, }) - if err != nil { - t.Fatalf("Unexpected error: %s", err) + + // Default index + for i := 1; i <= 10; i++ { + err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ + Action: "index", + DocumentID: strconv.Itoa(i), + Body: strings.NewReader(tt.body), + }) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } } - } - - // Index 1 - for i := 1; i <= 10; i++ { - err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ - Action: "index", - Index: "test-index-b", - Body: strings.NewReader(body), - }) - if err != nil { - t.Fatalf("Unexpected error: %s", err) + + // Index 1 + for i := 1; i <= 10; i++ { + err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ + Action: "index", + Index: "test-index-b", + Body: strings.NewReader(tt.body), + }) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } } - } - - // Index 2 - for i := 1; i <= 10; i++ { - err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ - Action: "index", - Index: "test-index-c", - Body: strings.NewReader(body), - }) + + // Index 2 + for i := 1; i <= 10; i++ { + err := bi.Add(context.Background(), opensearchutil.BulkIndexerItem{ + Action: "index", + Index: "test-index-c", + Body: strings.NewReader(tt.body), + }) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + } + + if err := bi.Close(context.Background()); err != nil { + t.Errorf("Unexpected error: %s", err) + } + stats := bi.Stats() + + expectedIndexed := 10 + 10 + 10 + if stats.NumIndexed != uint64(expectedIndexed) { + t.Errorf("Unexpected NumIndexed: want=%d, got=%d", expectedIndexed, stats.NumIndexed) + } + + res, err := client.Indices.Exists([]string{"test-index-a", "test-index-b", "test-index-c"}) if err != nil { t.Fatalf("Unexpected error: %s", err) } - } - - if err := bi.Close(context.Background()); err != nil { - t.Errorf("Unexpected error: %s", err) - } - stats := bi.Stats() - - expectedIndexed := 10 + 10 + 10 - if stats.NumIndexed != uint64(expectedIndexed) { - t.Errorf("Unexpected NumIndexed: want=%d, got=%d", expectedIndexed, stats.NumIndexed) - } - - res, err := client.Indices.Exists([]string{"test-index-a", "test-index-b", "test-index-c"}) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if res.StatusCode != 200 { - t.Errorf("Expected indices to exist, but got a [%s] response", res.Status()) - } + if res.StatusCode != 200 { + t.Errorf("Expected indices to exist, but got a [%s] response", res.Status()) + } + }) }) - }) + } } }