Skip to content

Commit

Permalink
add api integration test
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Nov 29, 2024
1 parent 7a3ca02 commit f8a65d4
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 64 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -688,20 +688,24 @@ jobs:
"db_collections": {
"foo": [
{
"name": "*"
"name": "*"
}
]
}
}
}'
- name: Run test
timeout-minutes: 15
timeout-minutes: 30
shell: bash
working-directory: tests
run: |
pip install -r requirements.txt --trusted-host https://test.pypi.org
pytest testcases/test_cdc_database.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500
pytest testcases/test_cdc_get.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500
pytest testcases/test_cdc_list.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500
pytest testcases/test_cdc_pause.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500
pytest testcases/test_cdc_resume.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500
- name: List CDC task
if: ${{ always() }}
Expand Down
18 changes: 7 additions & 11 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,6 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string,
}
if names, ok := e.collectionNames.data[uKey]; ok {
var duplicateCollections []string
containsAny := false
for _, name := range names {
d, c := util.GetCollectionNameFromFull(name)
if d == cdcreader.AllDatabase || c == cdcreader.AllCollection {
containsAny = true
}
}
for _, newCollectionName := range newCollectionNames {
if lo.Contains(names, newCollectionName) {
duplicateCollections = append(duplicateCollections, newCollectionName)
Expand All @@ -343,9 +336,12 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string,
if nd == cdcreader.AllDatabase && nc == cdcreader.AllCollection {
continue
}
if containsAny && !lo.Contains(e.collectionNames.excludeData[uKey], newCollectionName) {
duplicateCollections = append(duplicateCollections, newCollectionName)
continue
for _, name := range names {
match, containAny := matchCollectionName(name, newCollectionName)
if match && containAny && !lo.Contains(e.collectionNames.excludeData[uKey], newCollectionName) {
duplicateCollections = append(duplicateCollections, newCollectionName)
break
}
}
}
if len(duplicateCollections) > 0 {
Expand Down Expand Up @@ -886,7 +882,7 @@ func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, err
}
if err != nil {
taskLog.Warn("fail to new the data handler", zap.Error(err))
return nil, servererror.NewClientError("fail to new the data handler, task_id: ")
return nil, servererror.NewClientError("fail to new the data handler, task_id: " + info.TaskID)
}
writerObj := cdcwriter.NewChannelWriter(dataHandler, config.WriterConfig{
MessageBufferSize: bufferSize,
Expand Down
19 changes: 19 additions & 0 deletions server/cdc_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,25 @@ func TestCheckDuplicateCollection(t *testing.T) {
assert.Len(t, excludeCollections, 0)
})

t.Run("collection duplicate test", func(t *testing.T) {
metaCDC := &MetaCDC{}
initMetaCDCMap(metaCDC)
excludeCollections, err := metaCDC.checkDuplicateCollection("foo", []string{
util.GetFullCollectionName("foo", "*"),
}, model.ExtraInfo{
EnableUserRole: true,
}, nil)
assert.NoError(t, err)
assert.Len(t, excludeCollections, 0)

_, err = metaCDC.checkDuplicateCollection("foo", []string{
util.GetFullCollectionName("default", "col1"),
}, model.ExtraInfo{
EnableUserRole: false,
}, nil)
assert.NoError(t, err)
})

t.Run("map collection name", func(t *testing.T) {
metaCDC := &MetaCDC{}
initMetaCDCMap(metaCDC)
Expand Down
11 changes: 6 additions & 5 deletions tests/api/milvus_cdc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import requests

DEFAULT_TOKEN = 'root:Milvus'

class MilvusCdcClient:

Expand All @@ -17,7 +18,7 @@ def create_task(self, request_data):
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
return response.json()['data'], True
else:
return response.text, False

Expand All @@ -28,8 +29,8 @@ def list_tasks(self):
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
if response.status_code == 200 and 'data' in response.json():
return response.json()['data'], True
else:
return response.text, False

Expand All @@ -43,8 +44,8 @@ def get_task(self, task_id):
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
if response.status_code == 200 and 'data' in response.json():
return response.json()['data']['task'], True
else:
return response.text, False

Expand Down
12 changes: 6 additions & 6 deletions tests/testcases/test_cdc_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from datetime import datetime
from utils.util_log import test_log as log
from api.milvus_cdc import MilvusCdcClient
from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN
from pymilvus import (
connections, list_collections,
Collection, Partition
Expand Down Expand Up @@ -69,7 +69,7 @@ def test_cdc_for_collections_create_after_cdc_task(self, upstream_host, upstream

# check collections in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
timeout = 120
t0 = time.time()
log.info(f"all collections in downstream {list_collections()}")
Expand Down Expand Up @@ -131,7 +131,7 @@ def test_cdc_for_partitions_create_after_cdc_task(self, upstream_host, upstream_
assert set(p_name_list).issubset(set(list_partitions(col)))
# check collections in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
log.info(f"all collections in downstream {list_collections()}")
t0 = time.time()
timeout = 60
Expand Down Expand Up @@ -203,7 +203,7 @@ def test_cdc_for_collection_insert_after_cdc_task(self, upstream_host, upstream_

# check entities in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
col = Collection(name=c_name)
col.create_index(field_name="float_vector",
index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}})
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_cdc_for_partition_insert_after_cdc_task(self, upstream_host, upstream_p
p_name = "p1"
checker = InsertEntitiesPartitionChecker(host=upstream_host, port=upstream_port, c_name=c_name, p_name=p_name)
checker.run()
time.sleep(60)
time.sleep(20)
checker.pause()
# check entities in upstream
count_by_query_upstream = checker.get_count_by_query(p_name=p_name)
Expand All @@ -273,7 +273,7 @@ def test_cdc_for_partition_insert_after_cdc_task(self, upstream_host, upstream_p

# check entities in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
t0 = time.time()
timeout = 60
while True and time.time() - t0 < timeout:
Expand Down
4 changes: 2 additions & 2 deletions tests/testcases/test_cdc_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_cdc_sync_default_database_request(self, upstream_host, upstream_port, d
"""
connections.connect(host=upstream_host, port=upstream_port)
col_list = []
for i in range(10):
for i in range(5):
time.sleep(0.1)
collection_name = prefix + "not_match_database_" + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
col_list.append(collection_name)
Expand Down Expand Up @@ -86,7 +86,7 @@ def test_cdc_sync_not_match_database_request(self, upstream_host, upstream_port,
db.create_database("hoo")
db.using_database(db_name="hoo")
col_list = []
for i in range(10):
for i in range(5):
time.sleep(0.1)
collection_name = prefix + "not_match_database_" + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
col_list.append(collection_name)
Expand Down
31 changes: 20 additions & 11 deletions tests/testcases/test_cdc_delete.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
from datetime import datetime
from utils.util_log import test_log as log
from api.milvus_cdc import MilvusCdcClient
from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN
from pymilvus import (
connections,
Collection
Expand All @@ -11,7 +11,7 @@
)
from base.client_base import TestBase

prefix = "cdc_create_task_"
prefix = "cdc_delete_task_"
client = MilvusCdcClient('http://localhost:8444')


Expand Down Expand Up @@ -51,7 +51,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do
connections.connect(host=upstream_host, port=upstream_port)
checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name)
checker.run()
time.sleep(60)
time.sleep(20)
# pause the insert task
log.info(f"start to pause the insert task")
checker.pause()
Expand All @@ -65,13 +65,13 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do
# check the collection in downstream
connections.disconnect("default")
log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
collection = Collection(name=collection_name)
collection.create_index(field_name="float_vector",
index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}})
collection.load()
# wait for the collection to be synced
timeout = 60
timeout = 20
t0 = time.time()
count_by_query_downstream = len(
collection.query(expr=checker.query_expr, output_fields=checker.output_fields))
Expand All @@ -86,7 +86,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do
log.info(f"count_by_query_downstream: {count_by_query_downstream}")
assert count_by_query_upstream == count_by_query_downstream
# wait for the collection to be flushed
time.sleep(20)
time.sleep(10)
collection.flush()
num_entities_downstream = collection.num_entities
log.info(f"num_entities_downstream: {num_entities_downstream}")
Expand All @@ -113,7 +113,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do
connections.connect(host=upstream_host, port=upstream_port)
# insert entities into the collection
checker.resume()
time.sleep(60)
time.sleep(20)
checker.pause()
# check the collection in upstream
count_by_query_upstream_second = checker.get_count_by_query()
Expand All @@ -124,20 +124,29 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do
assert num_entities_upstream_second > num_entities_upstream

# connect to downstream
# connections.disconnect("default")
# log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
# connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
# collection = Collection(name=collection_name)

connections.disconnect("default")
log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
log.info("start to check the collection in downstream")
# check the collection in downstream has not been synced
timeout = 60
timeout = 10
t0 = time.time()
collection = Collection(name=collection_name)
count_by_query_downstream_second = len(
collection.query(expr=checker.query_expr, output_fields=checker.output_fields))
log.info(f"start count_by_query_downstream_second: {count_by_query_downstream_second}")
while True and time.time() - t0 < timeout:
count_by_query_downstream_second = len(
collection.query(expr=checker.query_expr, output_fields=checker.output_fields))
if count_by_query_downstream_second == count_by_query_upstream_second:
assert False
break
time.sleep(1)
log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}")
if time.time() - t0 > timeout:
log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}")
log.info(f"end count_by_query_downstream_second: {count_by_query_downstream_second}")
assert count_by_query_downstream_second == count_by_query_downstream
2 changes: 1 addition & 1 deletion tests/testcases/test_cdc_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_cdc_get_task(self, upstream_host, upstream_port, downstream_host, downs
col_list = []
task_id_list = []
for i in range(10):
time.sleep(0.1)
time.sleep(1)
collection_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
col_list.append(collection_name)
request_data = {
Expand Down
19 changes: 10 additions & 9 deletions tests/testcases/test_cdc_pause.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
from datetime import datetime
from utils.util_log import test_log as log
from api.milvus_cdc import MilvusCdcClient
from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN
from pymilvus import (
connections,
Collection
Expand Down Expand Up @@ -52,7 +52,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow
connections.connect(host=upstream_host, port=upstream_port)
checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name)
checker.run()
time.sleep(60)
time.sleep(20)
# pause the insert task
log.info(f"start to pause the insert task")
checker.pause()
Expand All @@ -66,7 +66,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow
# check the collection in downstream
connections.disconnect("default")
log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
collection = Collection(name=collection_name)
collection.create_index(field_name="float_vector",
index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}})
Expand All @@ -87,7 +87,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow
log.info(f"count_by_query_downstream: {count_by_query_downstream}")
assert count_by_query_upstream == count_by_query_downstream
# wait for the collection to be flushed
time.sleep(20)
time.sleep(10)
collection.flush()
num_entities_downstream = collection.num_entities
log.info(f"num_entities_downstream: {num_entities_downstream}")
Expand Down Expand Up @@ -116,7 +116,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow
connections.connect(host=upstream_host, port=upstream_port)
# insert entities into the collection
checker.resume()
time.sleep(60)
time.sleep(20)
checker.pause()
# check the collection in upstream
count_by_query_upstream_second = checker.get_count_by_query()
Expand All @@ -129,15 +129,16 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow
# connect to downstream
connections.disconnect("default")
log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
connections.connect(host=downstream_host, port=downstream_port)
connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN)
# check the collection in downstream has not been synced
timeout = 60
timeout = 30
collection = Collection(name=collection_name)
count_by_query_downstream_second = len(
collection.query(expr=checker.query_expr, output_fields=checker.output_fields))
collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually"))
t0 = time.time()
while True and time.time() - t0 < timeout:
count_by_query_downstream_second = len(
collection.query(expr=checker.query_expr, output_fields=checker.output_fields))
collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually"))
if count_by_query_downstream_second == count_by_query_upstream_second:
assert False
time.sleep(1)
Expand Down
Loading

0 comments on commit f8a65d4

Please sign in to comment.