diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9213f00b..e854f4f0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -688,7 +688,7 @@ jobs: "db_collections": { "foo": [ { - "name": "*" + "name": "*" } ] } @@ -696,12 +696,16 @@ jobs: }' - name: Run test - timeout-minutes: 15 + timeout-minutes: 30 shell: bash working-directory: tests run: | pip install -r requirements.txt --trusted-host https://test.pypi.org pytest testcases/test_cdc_database.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_get.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_list.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_pause.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 + pytest testcases/test_cdc_resume.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500 - name: List CDC task if: ${{ always() }} diff --git a/server/cdc_impl.go b/server/cdc_impl.go index 596ba9d9..8f312bb0 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -327,13 +327,6 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, } if names, ok := e.collectionNames.data[uKey]; ok { var duplicateCollections []string - containsAny := false - for _, name := range names { - d, c := util.GetCollectionNameFromFull(name) - if d == cdcreader.AllDatabase || c == cdcreader.AllCollection { - containsAny = true - } - } for _, newCollectionName := range newCollectionNames { if lo.Contains(names, newCollectionName) { duplicateCollections = append(duplicateCollections, newCollectionName) @@ -343,9 +336,12 @@ func (e *MetaCDC) checkDuplicateCollection(uKey string, if nd == cdcreader.AllDatabase && nc == cdcreader.AllCollection { continue } - if containsAny && !lo.Contains(e.collectionNames.excludeData[uKey], newCollectionName) { - duplicateCollections = append(duplicateCollections, newCollectionName) - continue + for _, name := range names { + match, containAny := matchCollectionName(name, newCollectionName) + if match && containAny && !lo.Contains(e.collectionNames.excludeData[uKey], newCollectionName) { + duplicateCollections = append(duplicateCollections, newCollectionName) + break + } } } if len(duplicateCollections) > 0 { @@ -886,7 +882,7 @@ func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, err } if err != nil { taskLog.Warn("fail to new the data handler", zap.Error(err)) - return nil, servererror.NewClientError("fail to new the data handler, task_id: ") + return nil, servererror.NewClientError("fail to new the data handler, task_id: " + info.TaskID) } writerObj := cdcwriter.NewChannelWriter(dataHandler, config.WriterConfig{ MessageBufferSize: bufferSize, diff --git a/server/cdc_impl_test.go b/server/cdc_impl_test.go index 6f08722e..3aa931e2 100644 --- a/server/cdc_impl_test.go +++ b/server/cdc_impl_test.go @@ -1591,6 +1591,25 @@ func TestCheckDuplicateCollection(t *testing.T) { assert.Len(t, excludeCollections, 0) }) + t.Run("collection duplicate test", func(t *testing.T) { + metaCDC := &MetaCDC{} + initMetaCDCMap(metaCDC) + excludeCollections, err := metaCDC.checkDuplicateCollection("foo", []string{ + util.GetFullCollectionName("foo", "*"), + }, model.ExtraInfo{ + EnableUserRole: true, + }, nil) + assert.NoError(t, err) + assert.Len(t, excludeCollections, 0) + + _, err = metaCDC.checkDuplicateCollection("foo", []string{ + util.GetFullCollectionName("default", "col1"), + }, model.ExtraInfo{ + EnableUserRole: false, + }, nil) + assert.NoError(t, err) + }) + t.Run("map collection name", func(t *testing.T) { metaCDC := &MetaCDC{} initMetaCDCMap(metaCDC) diff --git a/tests/api/milvus_cdc.py b/tests/api/milvus_cdc.py index dbcabbcc..66815b23 100644 --- a/tests/api/milvus_cdc.py +++ b/tests/api/milvus_cdc.py @@ -1,6 +1,7 @@ import json import requests +DEFAULT_TOKEN = 'root:Milvus' class MilvusCdcClient: @@ -17,7 +18,7 @@ def create_task(self, request_data): payload = json.dumps(body) response = requests.post(url, headers=self.headers, data=payload) if response.status_code == 200: - return response.json(), True + return response.json()['data'], True else: return response.text, False @@ -28,8 +29,8 @@ def list_tasks(self): } payload = json.dumps(body) response = requests.post(url, headers=self.headers, data=payload) - if response.status_code == 200: - return response.json(), True + if response.status_code == 200 and 'data' in response.json(): + return response.json()['data'], True else: return response.text, False @@ -43,8 +44,8 @@ def get_task(self, task_id): } payload = json.dumps(body) response = requests.post(url, headers=self.headers, data=payload) - if response.status_code == 200: - return response.json(), True + if response.status_code == 200 and 'data' in response.json(): + return response.json()['data']['task'], True else: return response.text, False diff --git a/tests/testcases/test_cdc_create.py b/tests/testcases/test_cdc_create.py index ff90b2e6..8010fcad 100644 --- a/tests/testcases/test_cdc_create.py +++ b/tests/testcases/test_cdc_create.py @@ -2,7 +2,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, list_collections, Collection, Partition @@ -69,7 +69,7 @@ def test_cdc_for_collections_create_after_cdc_task(self, upstream_host, upstream # check collections in downstream connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) timeout = 120 t0 = time.time() log.info(f"all collections in downstream {list_collections()}") @@ -131,7 +131,7 @@ def test_cdc_for_partitions_create_after_cdc_task(self, upstream_host, upstream_ assert set(p_name_list).issubset(set(list_partitions(col))) # check collections in downstream connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) log.info(f"all collections in downstream {list_collections()}") t0 = time.time() timeout = 60 @@ -203,7 +203,7 @@ def test_cdc_for_collection_insert_after_cdc_task(self, upstream_host, upstream_ # check entities in downstream connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) col = Collection(name=c_name) col.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) @@ -263,7 +263,7 @@ def test_cdc_for_partition_insert_after_cdc_task(self, upstream_host, upstream_p p_name = "p1" checker = InsertEntitiesPartitionChecker(host=upstream_host, port=upstream_port, c_name=c_name, p_name=p_name) checker.run() - time.sleep(60) + time.sleep(20) checker.pause() # check entities in upstream count_by_query_upstream = checker.get_count_by_query(p_name=p_name) @@ -273,7 +273,7 @@ def test_cdc_for_partition_insert_after_cdc_task(self, upstream_host, upstream_p # check entities in downstream connections.disconnect("default") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) t0 = time.time() timeout = 60 while True and time.time() - t0 < timeout: diff --git a/tests/testcases/test_cdc_database.py b/tests/testcases/test_cdc_database.py index d38c492d..e6d198f9 100644 --- a/tests/testcases/test_cdc_database.py +++ b/tests/testcases/test_cdc_database.py @@ -38,7 +38,7 @@ def test_cdc_sync_default_database_request(self, upstream_host, upstream_port, d """ connections.connect(host=upstream_host, port=upstream_port) col_list = [] - for i in range(10): + for i in range(5): time.sleep(0.1) collection_name = prefix + "not_match_database_" + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') col_list.append(collection_name) @@ -86,7 +86,7 @@ def test_cdc_sync_not_match_database_request(self, upstream_host, upstream_port, db.create_database("hoo") db.using_database(db_name="hoo") col_list = [] - for i in range(10): + for i in range(5): time.sleep(0.1) collection_name = prefix + "not_match_database_" + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') col_list.append(collection_name) diff --git a/tests/testcases/test_cdc_delete.py b/tests/testcases/test_cdc_delete.py index b3887437..9786913d 100644 --- a/tests/testcases/test_cdc_delete.py +++ b/tests/testcases/test_cdc_delete.py @@ -1,7 +1,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, Collection @@ -11,7 +11,7 @@ ) from base.client_base import TestBase -prefix = "cdc_create_task_" +prefix = "cdc_delete_task_" client = MilvusCdcClient('http://localhost:8444') @@ -51,7 +51,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do connections.connect(host=upstream_host, port=upstream_port) checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name) checker.run() - time.sleep(60) + time.sleep(20) # pause the insert task log.info(f"start to pause the insert task") checker.pause() @@ -65,13 +65,13 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do # check the collection in downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) collection = Collection(name=collection_name) collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) collection.load() # wait for the collection to be synced - timeout = 60 + timeout = 20 t0 = time.time() count_by_query_downstream = len( collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) @@ -86,7 +86,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do log.info(f"count_by_query_downstream: {count_by_query_downstream}") assert count_by_query_upstream == count_by_query_downstream # wait for the collection to be flushed - time.sleep(20) + time.sleep(10) collection.flush() num_entities_downstream = collection.num_entities log.info(f"num_entities_downstream: {num_entities_downstream}") @@ -113,7 +113,7 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do connections.connect(host=upstream_host, port=upstream_port) # insert entities into the collection checker.resume() - time.sleep(60) + time.sleep(20) checker.pause() # check the collection in upstream count_by_query_upstream_second = checker.get_count_by_query() @@ -124,20 +124,29 @@ def test_cdc_delete_task(self, upstream_host, upstream_port, downstream_host, do assert num_entities_upstream_second > num_entities_upstream # connect to downstream + # connections.disconnect("default") + # log.info(f"start to connect to downstream {downstream_host} {downstream_port}") + # connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) + # collection = Collection(name=collection_name) + connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) + log.info("start to check the collection in downstream") # check the collection in downstream has not been synced - timeout = 60 + timeout = 10 t0 = time.time() + collection = Collection(name=collection_name) count_by_query_downstream_second = len( collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + log.info(f"start count_by_query_downstream_second: {count_by_query_downstream_second}") while True and time.time() - t0 < timeout: count_by_query_downstream_second = len( collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) if count_by_query_downstream_second == count_by_query_upstream_second: - assert False + break time.sleep(1) + log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}") if time.time() - t0 > timeout: - log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}") + log.info(f"end count_by_query_downstream_second: {count_by_query_downstream_second}") assert count_by_query_downstream_second == count_by_query_downstream diff --git a/tests/testcases/test_cdc_get.py b/tests/testcases/test_cdc_get.py index c5550872..c8a866cb 100644 --- a/tests/testcases/test_cdc_get.py +++ b/tests/testcases/test_cdc_get.py @@ -24,7 +24,7 @@ def test_cdc_get_task(self, upstream_host, upstream_port, downstream_host, downs col_list = [] task_id_list = [] for i in range(10): - time.sleep(0.1) + time.sleep(1) collection_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') col_list.append(collection_name) request_data = { diff --git a/tests/testcases/test_cdc_pause.py b/tests/testcases/test_cdc_pause.py index 625c847f..d637e411 100644 --- a/tests/testcases/test_cdc_pause.py +++ b/tests/testcases/test_cdc_pause.py @@ -1,7 +1,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, Collection @@ -52,7 +52,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow connections.connect(host=upstream_host, port=upstream_port) checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name) checker.run() - time.sleep(60) + time.sleep(20) # pause the insert task log.info(f"start to pause the insert task") checker.pause() @@ -66,7 +66,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow # check the collection in downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) collection = Collection(name=collection_name) collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) @@ -87,7 +87,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow log.info(f"count_by_query_downstream: {count_by_query_downstream}") assert count_by_query_upstream == count_by_query_downstream # wait for the collection to be flushed - time.sleep(20) + time.sleep(10) collection.flush() num_entities_downstream = collection.num_entities log.info(f"num_entities_downstream: {num_entities_downstream}") @@ -116,7 +116,7 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow connections.connect(host=upstream_host, port=upstream_port) # insert entities into the collection checker.resume() - time.sleep(60) + time.sleep(20) checker.pause() # check the collection in upstream count_by_query_upstream_second = checker.get_count_by_query() @@ -129,15 +129,16 @@ def test_cdc_pause_task(self, upstream_host, upstream_port, downstream_host, dow # connect to downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) # check the collection in downstream has not been synced - timeout = 60 + timeout = 30 + collection = Collection(name=collection_name) count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) t0 = time.time() while True and time.time() - t0 < timeout: count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) if count_by_query_downstream_second == count_by_query_upstream_second: assert False time.sleep(1) diff --git a/tests/testcases/test_cdc_resume.py b/tests/testcases/test_cdc_resume.py index 9b03e255..0d575efb 100644 --- a/tests/testcases/test_cdc_resume.py +++ b/tests/testcases/test_cdc_resume.py @@ -1,7 +1,7 @@ import time from datetime import datetime from utils.util_log import test_log as log -from api.milvus_cdc import MilvusCdcClient +from api.milvus_cdc import MilvusCdcClient, DEFAULT_TOKEN from pymilvus import ( connections, Collection @@ -19,12 +19,14 @@ class TestCdcResume(TestBase): """ Test Milvus CDC delete """ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, downstream_port): - """ - target: test cdc delete task - method: create task, delete task - expected: create successfully, delete successfully - """ - collection_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + collection_name1 = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + collection_name2 = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + task_id1 = self.test_create_cdc_task(upstream_host, upstream_port, downstream_host, downstream_port, collection_name1) + task_id2 = self.test_create_cdc_task(upstream_host, upstream_port, downstream_host, downstream_port, collection_name2) + self.test_cdc_resume_task_with_collection_name(upstream_host, upstream_port, downstream_host, downstream_port, collection_name1, task_id1) + self.test_cdc_resume_task_with_collection_name(upstream_host, upstream_port, downstream_host, downstream_port, collection_name2, task_id2) + + def test_create_cdc_task(self, upstream_host, upstream_port, downstream_host, downstream_port, collection_name): # create cdc task request_data = { "milvus_connect_param": { @@ -47,15 +49,23 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do assert result log.info(f"create task response: {rsp}") task_id = rsp['task_id'] + return task_id + + def test_cdc_resume_task_with_collection_name(self, upstream_host, upstream_port, downstream_host, downstream_port, collection_name, task_id): + """ + target: test cdc delete task + method: create task, delete task + expected: create successfully, delete successfully + """ # create collection and insert entities into it in upstream connections.connect(host=upstream_host, port=upstream_port) checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name) checker.run() - time.sleep(60) + time.sleep(20) # pause the insert task - log.info(f"start to pause the insert task") + log.info("start to pause the insert task") checker.pause() - log.info(f"pause the insert task successfully") + log.info("pause the insert task successfully") # check the collection in upstream num_entities_upstream = checker.get_num_entities() log.info(f"num_entities_upstream: {num_entities_upstream}") @@ -65,7 +75,7 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do # check the collection in downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) collection = Collection(name=collection_name) collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}) @@ -114,7 +124,7 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do connections.connect(host=upstream_host, port=upstream_port) # insert entities into the collection checker.resume() - time.sleep(60) + time.sleep(20) checker.pause() # check the collection in upstream count_by_query_upstream_second = checker.get_count_by_query() @@ -127,15 +137,16 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do # connect to downstream connections.disconnect("default") log.info(f"start to connect to downstream {downstream_host} {downstream_port}") - connections.connect(host=downstream_host, port=downstream_port) + connections.connect(host=downstream_host, port=downstream_port, token=DEFAULT_TOKEN) # check the collection in downstream has not been synced timeout = 60 + collection = Collection(name=collection_name) count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) t0 = time.time() while True and time.time() - t0 < timeout: count_by_query_downstream_second = len( - collection.query(expr=checker.query_expr, output_fields=checker.output_fields)) + collection.query(expr=checker.query_expr, output_fields=checker.output_fields, consistency_level="Eventually")) if count_by_query_downstream_second == count_by_query_upstream_second: assert False time.sleep(1) @@ -162,6 +173,6 @@ def test_cdc_resume_task(self, upstream_host, upstream_port, downstream_host, do time.sleep(1) if time.time() - t0 > timeout: log.info(f"count_by_query_downstream_second: {count_by_query_downstream_second}") - raise Exception(f"Timeout waiting for collection {collection_name} to be synced") + raise Exception(f"Timeout waiting for collection {collection_name} to be synced") log.info(f"after resume cdc task, count_by_query_downstream_second: {count_by_query_downstream_second}") - assert count_by_query_downstream_second == count_by_query_upstream_second + assert count_by_query_downstream_second == count_by_query_upstream_second