diff --git a/.github/helm/values/values-rollout-agent.yaml b/.github/helm/values/values-rollout-agent.yaml index 3f229f5740..a782d710a7 100644 --- a/.github/helm/values/values-rollout-agent.yaml +++ b/.github/helm/values/values-rollout-agent.yaml @@ -120,6 +120,7 @@ discoverer: memory: 50Mi manager: index: + enabled: true replicas: 1 resources: requests: @@ -130,7 +131,7 @@ manager: auto_index_check_duration: 30s auto_index_length: 1000 corrector: - enabled: true + enabled: false # suspend because you do not want corrector to start automatically in CI # instead run it manually suspend: true diff --git a/Makefile b/Makefile index a364ce707a..b3fad08986 100644 --- a/Makefile +++ b/Makefile @@ -358,6 +358,7 @@ E2E_PORTFORWARD_ENABLED ?= true E2E_REMOVE_COUNT ?= 3 E2E_SEARCH_BY_ID_COUNT ?= 100 E2E_SEARCH_COUNT ?= 1000 +E2E_SEARCH_CONCURRENCY ?= 10 E2E_TARGET_NAME ?= vald-lb-gateway E2E_TARGET_NAMESPACE ?= default E2E_TARGET_POD_NAME ?= $(eval E2E_TARGET_POD_NAME := $(shell kubectl get pods --selector=app=$(E2E_TARGET_NAME) -n $(E2E_TARGET_NAMESPACE) | tail -1 | cut -f1 -d " "))$(E2E_TARGET_POD_NAME) diff --git a/Makefile.d/functions.mk b/Makefile.d/functions.mk index dc440a66cf..23d01df455 100644 --- a/Makefile.d/functions.mk +++ b/Makefile.d/functions.mk @@ -150,6 +150,7 @@ define run-e2e-crud-test -correction-insert-num=$(E2E_INSERT_COUNT) \ -insert-num=$(E2E_INSERT_COUNT) \ -search-num=$(E2E_SEARCH_COUNT) \ + -search-conn=$(E2E_SEARCH_CONCURRENCY) \ -search-by-id-num=$(E2E_SEARCH_BY_ID_COUNT) \ -get-object-num=$(E2E_GET_OBJECT_COUNT) \ -update-num=$(E2E_UPDATE_COUNT) \ diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index 40ea17be78..fb7f307c8b 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -83,9 +83,9 @@ func init() { flag.IntVar(&insertNum, "insert-num", 10000, "number of id-vector pairs used for insert") flag.IntVar(&correctionInsertNum, "correction-insert-num", 10000, "number of id-vector pairs used for insert") - flag.IntVar(&searchNum, "search-num", 1000, "number of id-vector pairs used for search") + flag.IntVar(&searchNum, "search-num", 10000, "number of id-vector pairs used for search") flag.IntVar(&searchByIDNum, "search-by-id-num", 3, "number of id-vector pairs used for search-by-id") - flag.IntVar(&searchConcurrency, "search-conn", 10, "number of search concurrency") + flag.IntVar(&searchConcurrency, "search-conn", 100, "number of search concurrency") flag.IntVar(&getObjectNum, "get-object-num", 100, "number of id-vector pairs used for get-object") flag.IntVar(&updateNum, "update-num", 10000, "number of id-vector pairs used for update") flag.IntVar(&upsertNum, "upsert-num", 10000, "number of id-vector pairs used for upsert") @@ -1019,7 +1019,7 @@ func TestE2EAgentRolloutRestart(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - err = op.Upsert(t, ctx, operation.Dataset{ + _ = op.Upsert(t, ctx, operation.Dataset{ Train: ds.Train[insertFrom : insertFrom+insertNum], }) if err != nil { @@ -1037,12 +1037,12 @@ func TestE2EAgentRolloutRestart(t *testing.T) { wg := sync.WaitGroup{} mu := sync.Mutex{} + done := make(chan struct{}) var serr error + wg.Add(1) - done := make(chan struct{}) go func() { defer wg.Done() - for { select { case <-done: @@ -1108,16 +1108,16 @@ func TestE2EAgentRolloutRestart(t *testing.T) { } // Remove all vector data after the current - 1 hour. - // err = op.Flush(t, ctx) err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + close(done) wg.Wait() if serr != nil { t.Fatalf("an error occurred: %s", serr) } - if err != nil { - t.Fatalf("an error occurred: %s", err) - } } // TestE2EHighConcurrencyMultiSearch tests that high concurrency search requests succeed. @@ -1140,14 +1140,15 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - // _ = op.Upsert(t, ctx, operation.Dataset{ - // Train: ds.Train[insertFrom : insertFrom+insertNum], - // }) - // if err != nil { - // t.Fatalf("an error occurred: %s", err) - // } - // - // sleep(t, waitAfterInsertDuration) + _ = op.Upsert(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + sleep(t, waitAfterInsertDuration) + searchFunc := func(ctx context.Context) error { return op.MultiSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], @@ -1187,20 +1188,17 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) { mu.Lock() serr = errors.Join(serr, egerr) mu.Unlock() - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) } } }() - // close(done) - wg.Wait() - // close(done) // Wait for StatefulSet to be ready - // t.Log("rollout restart agent and waiting for agent pods ready...") - // err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String()) - // if err != nil { - // t.Fatalf("an error occurred: %s", err) - // } + t.Log("rollout restart agent and waiting for agent pods ready...") + err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } cnt, err := op.IndexInfo(t, ctx) if err != nil { @@ -1210,17 +1208,17 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) { t.Fatalf("an error occurred: count = %d, err = %s", cnt.Stored, err) } - // err = op.Exists(t, ctx, "0") - // if err != nil { - // t.Fatalf("an error occurred: %s", err) - // } + err = op.Exists(t, ctx, "0") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } - // err = op.GetObject(t, ctx, operation.Dataset{ - // Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum], - // }) - // if err != nil { - // t.Fatalf("an error occurred: %s", err) - // } + err = op.GetObject(t, ctx, operation.Dataset{ + Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } err = op.Remove(t, ctx, operation.Dataset{ Train: ds.Train[removeFrom : removeFrom+removeNum], @@ -1230,14 +1228,14 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) { } // Remove all vector data after the current - 1 hour. - // err = op.Flush(t, ctx) err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano()) - // close(done) - // wg.Wait() - if serr != nil { - t.Fatalf("an error occurred: %s", serr) - } if err != nil { t.Fatalf("an error occurred: %s", err) } + + close(done) + wg.Wait() + if serr != nil { + t.Fatalf("an error occurred: %s", serr.Error()) + } } diff --git a/tests/e2e/operation/multi.go b/tests/e2e/operation/multi.go index 1a8fc8ddca..7b655eef64 100644 --- a/tests/e2e/operation/multi.go +++ b/tests/e2e/operation/multi.go @@ -19,6 +19,7 @@ import ( "context" "strconv" "testing" + "time" "github.com/vdaas/vald/apis/grpc/v1/payload" ) @@ -29,10 +30,12 @@ func (c *client) MultiSearch(t *testing.T, ctx context.Context, ds Dataset) erro return err } + to := time.Second * 3 cfg := &payload.Search_Config{ Num: 10, Radius: -1.0, Epsilon: 0.1, + Timeout: to.Nanoseconds(), } reqs := make([]*payload.Search_Request, 0, len(ds.Test)) diff --git a/tests/e2e/operation/stream.go b/tests/e2e/operation/stream.go index 0d1d950715..08bd3fb505 100644 --- a/tests/e2e/operation/stream.go +++ b/tests/e2e/operation/stream.go @@ -68,6 +68,7 @@ func ParseAndLogError(t *testing.T, err error) error { } func (c *client) Search(t *testing.T, ctx context.Context, ds Dataset) error { + to := time.Second * 3 return c.SearchWithParameters( t, ctx, @@ -75,7 +76,7 @@ func (c *client) Search(t *testing.T, ctx context.Context, ds Dataset) error { 100, -1.0, 0.1, - 3000000000, + to.Nanoseconds(), DefaultStatusValidator, ParseAndLogError, )