Skip to content

Commit

Permalink
:rerycle: Add Multi test
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <kyukawa315@gmail.com>
  • Loading branch information
vankichi committed Jan 28, 2025
1 parent 497df3c commit 807732e
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 44 deletions.
3 changes: 2 additions & 1 deletion .github/helm/values/values-rollout-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ discoverer:
memory: 50Mi
manager:
index:
enabled: true
replicas: 1
resources:
requests:
Expand All @@ -130,7 +131,7 @@ manager:
auto_index_check_duration: 30s
auto_index_length: 1000
corrector:
enabled: true
enabled: false
# suspend because you do not want corrector to start automatically in CI
# instead run it manually
suspend: true
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ E2E_PORTFORWARD_ENABLED ?= true
E2E_REMOVE_COUNT ?= 3
E2E_SEARCH_BY_ID_COUNT ?= 100
E2E_SEARCH_COUNT ?= 1000
E2E_SEARCH_CONCURRENCY ?= 10
E2E_TARGET_NAME ?= vald-lb-gateway
E2E_TARGET_NAMESPACE ?= default
E2E_TARGET_POD_NAME ?= $(eval E2E_TARGET_POD_NAME := $(shell kubectl get pods --selector=app=$(E2E_TARGET_NAME) -n $(E2E_TARGET_NAMESPACE) | tail -1 | cut -f1 -d " "))$(E2E_TARGET_POD_NAME)
Expand Down
1 change: 1 addition & 0 deletions Makefile.d/functions.mk
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ define run-e2e-crud-test
-correction-insert-num=$(E2E_INSERT_COUNT) \
-insert-num=$(E2E_INSERT_COUNT) \
-search-num=$(E2E_SEARCH_COUNT) \
-search-conn=$(E2E_SEARCH_CONCURRENCY) \
-search-by-id-num=$(E2E_SEARCH_BY_ID_COUNT) \
-get-object-num=$(E2E_GET_OBJECT_COUNT) \
-update-num=$(E2E_UPDATE_COUNT) \
Expand Down
82 changes: 40 additions & 42 deletions tests/e2e/crud/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ func init() {

flag.IntVar(&insertNum, "insert-num", 10000, "number of id-vector pairs used for insert")
flag.IntVar(&correctionInsertNum, "correction-insert-num", 10000, "number of id-vector pairs used for insert")
flag.IntVar(&searchNum, "search-num", 1000, "number of id-vector pairs used for search")
flag.IntVar(&searchNum, "search-num", 10000, "number of id-vector pairs used for search")
flag.IntVar(&searchByIDNum, "search-by-id-num", 3, "number of id-vector pairs used for search-by-id")
flag.IntVar(&searchConcurrency, "search-conn", 10, "number of search concurrency")
flag.IntVar(&searchConcurrency, "search-conn", 100, "number of search concurrency")
flag.IntVar(&getObjectNum, "get-object-num", 100, "number of id-vector pairs used for get-object")
flag.IntVar(&updateNum, "update-num", 10000, "number of id-vector pairs used for update")
flag.IntVar(&upsertNum, "upsert-num", 10000, "number of id-vector pairs used for upsert")
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func TestE2EAgentRolloutRestart(t *testing.T) {
t.Fatalf("an error occurred: %s", err)
}

err = op.Upsert(t, ctx, operation.Dataset{
_ = op.Upsert(t, ctx, operation.Dataset{
Train: ds.Train[insertFrom : insertFrom+insertNum],
})
if err != nil {
Expand All @@ -1037,12 +1037,12 @@ func TestE2EAgentRolloutRestart(t *testing.T) {

wg := sync.WaitGroup{}
mu := sync.Mutex{}
done := make(chan struct{})
var serr error

wg.Add(1)
done := make(chan struct{})
go func() {
defer wg.Done()

for {
select {
case <-done:
Expand Down Expand Up @@ -1108,16 +1108,16 @@ func TestE2EAgentRolloutRestart(t *testing.T) {
}

// Remove all vector data after the current - 1 hour.
// err = op.Flush(t, ctx)
err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano())
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

close(done)
wg.Wait()
if serr != nil {
t.Fatalf("an error occurred: %s", serr)
}
if err != nil {
t.Fatalf("an error occurred: %s", err)
}
}

// TestE2EHighConcurrencyMultiSearch tests that high concurrency search requests succeed.
Expand All @@ -1140,14 +1140,15 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) {
t.Fatalf("an error occurred: %s", err)
}

// _ = op.Upsert(t, ctx, operation.Dataset{
// Train: ds.Train[insertFrom : insertFrom+insertNum],
// })
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
//
// sleep(t, waitAfterInsertDuration)
_ = op.Upsert(t, ctx, operation.Dataset{
Train: ds.Train[insertFrom : insertFrom+insertNum],
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

sleep(t, waitAfterInsertDuration)

searchFunc := func(ctx context.Context) error {
return op.MultiSearch(t, ctx, operation.Dataset{
Test: ds.Test[searchFrom : searchFrom+searchNum],
Expand Down Expand Up @@ -1187,20 +1188,17 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) {
mu.Lock()
serr = errors.Join(serr, egerr)
mu.Unlock()
time.Sleep(10 * time.Second)
time.Sleep(5 * time.Second)
}
}
}()
// close(done)
wg.Wait()
// close(done)

// Wait for StatefulSet to be ready
// t.Log("rollout restart agent and waiting for agent pods ready...")
// err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String())
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
t.Log("rollout restart agent and waiting for agent pods ready...")
err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String())
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

cnt, err := op.IndexInfo(t, ctx)
if err != nil {
Expand All @@ -1210,17 +1208,17 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) {
t.Fatalf("an error occurred: count = %d, err = %s", cnt.Stored, err)
}

// err = op.Exists(t, ctx, "0")
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
err = op.Exists(t, ctx, "0")
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

// err = op.GetObject(t, ctx, operation.Dataset{
// Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum],
// })
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
err = op.GetObject(t, ctx, operation.Dataset{
Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum],
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

err = op.Remove(t, ctx, operation.Dataset{
Train: ds.Train[removeFrom : removeFrom+removeNum],
Expand All @@ -1230,14 +1228,14 @@ func TestE2EHighConcurrencyMultiSearch(t *testing.T) {
}

// Remove all vector data after the current - 1 hour.
// err = op.Flush(t, ctx)
err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano())
// close(done)
// wg.Wait()
if serr != nil {
t.Fatalf("an error occurred: %s", serr)
}
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

close(done)
wg.Wait()
if serr != nil {
t.Fatalf("an error occurred: %s", serr.Error())
}
}
3 changes: 3 additions & 0 deletions tests/e2e/operation/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"strconv"
"testing"
"time"

"github.com/vdaas/vald/apis/grpc/v1/payload"
)
Expand All @@ -29,10 +30,12 @@ func (c *client) MultiSearch(t *testing.T, ctx context.Context, ds Dataset) erro
return err
}

to := time.Second * 3
cfg := &payload.Search_Config{
Num: 10,
Radius: -1.0,
Epsilon: 0.1,
Timeout: to.Nanoseconds(),
}

reqs := make([]*payload.Search_Request, 0, len(ds.Test))
Expand Down
3 changes: 2 additions & 1 deletion tests/e2e/operation/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ func ParseAndLogError(t *testing.T, err error) error {
}

func (c *client) Search(t *testing.T, ctx context.Context, ds Dataset) error {
to := time.Second * 3
return c.SearchWithParameters(
t,
ctx,
ds,
100,
-1.0,
0.1,
3000000000,
to.Nanoseconds(),
DefaultStatusValidator,
ParseAndLogError,
)
Expand Down

0 comments on commit 807732e

Please sign in to comment.