From ef03d3e2487b41d57132766e466254bb8a1a21ea Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Fri, 25 Oct 2024 21:23:17 +0800 Subject: [PATCH] Rework pysdk connection pool (#2105) ### What problem does this PR solve? Rework pysdk connection pool ### Type of change - [x] Refactoring - [x] Python SDK impacted, Need to update PyPI --- .github/workflows/release.yml | 12 ++--- .github/workflows/tests.yml | 4 +- .../infinity_sdk/infinity/connection_pool.py | 48 +++++++------------ python/test_pysdk/test_connection_pool.py | 27 ++++------- python/test_pysdk/test_knn.py | 14 +++++- 5 files changed, 49 insertions(+), 56 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d59bfb64fc..197419cbd0 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,7 +23,7 @@ jobs: # https://github.com/actions/checkout/blob/v3/README.md - name: Check out code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: ssh-key: ${{ secrets.MY_DEPLOY_KEY }} @@ -55,14 +55,14 @@ jobs: run: | if [[ $GITHUB_EVENT_NAME == 'schedule' ]]; then # Determine if a given tag exists and matches a specific Git commit. - # actions/checkout@v3 fetch-tags doesn't work when triggered by schedule + # actions/checkout@v4 fetch-tags doesn't work when triggered by schedule git fetch --tags if [ "$(git rev-parse -q --verify "refs/tags/$RELEASE_TAG")" = "$GITHUB_SHA" ]; then - echo "mutalbe tag $RELEASE_TAG exists and matchs $GITHUB_SHA" + echo "mutable tag $RELEASE_TAG exists and matches $GITHUB_SHA" else git tag -f $RELEASE_TAG $GITHUB_SHA git push -f origin $RELEASE_TAG:refs/tags/$RELEASE_TAG - echo "created/moved mutalbe tag $RELEASE_TAG to $GITHUB_SHA" + echo "created/moved mutable tag $RELEASE_TAG to $GITHUB_SHA" fi fi @@ -83,7 +83,7 @@ jobs: - name: Create or overwrite a release # https://github.com/actions/upload-release-asset has been replaced by https://github.com/softprops/action-gh-release - uses: softprops/action-gh-release@v1 + uses: softprops/action-gh-release@v2 with: token: ${{ secrets.MY_GITHUB_TOKEN }} # Use the secret as an environment variable prerelease: ${{ env.PRERELEASE }} @@ -110,7 +110,7 @@ jobs: # https://github.com/marketplace/actions/build-and-push-docker-images - name: Build and push - uses: docker/build-push-action@v5 + uses: docker/build-push-action@v6 with: context: . push: true diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0defeaea85..91aa56b7ee 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -28,7 +28,7 @@ jobs: runs-on: [ "self-hosted", "debug" ] steps: # https://github.com/hmarr/debug-action - #- uses: hmarr/debug-action@v2 + #- uses: hmarr/debug-action@v3 - name: Show PR labels if: ${{ !cancelled() && !failure() }} @@ -195,7 +195,7 @@ jobs: runs-on: [ "self-hosted", "benchmark" ] steps: # https://github.com/hmarr/debug-action - #- uses: hmarr/debug-action@v2 + #- uses: hmarr/debug-action@v3 - name: Show PR labels if: ${{ !cancelled() && !failure() }} diff --git a/python/infinity_sdk/infinity/connection_pool.py b/python/infinity_sdk/infinity/connection_pool.py index 7f12a51ede..6f4893e07e 100644 --- a/python/infinity_sdk/infinity/connection_pool.py +++ b/python/infinity_sdk/infinity/connection_pool.py @@ -1,42 +1,29 @@ -from threading import Lock, Condition +from threading import Lock import infinity from infinity.common import NetworkAddress import logging -from infinity.remote_thrift.infinity import RemoteThriftInfinityConnection class ConnectionPool(object): - - - def __init__(self, uri = NetworkAddress("127.0.0.1", 23817), min_size=4, max_size=16, timeout=10.0): - assert (min_size <= max_size) + def __init__(self, uri = NetworkAddress("127.0.0.1", 23817), max_size=16): self.uri_ = uri - self.min_size_ = min_size self.max_size_ = max_size - self.curr_size_ = 0 - self.timeout_ = timeout self.free_pool_ = [] - self.created_conns_ = [] - self.cond_ = Condition() - - while(self.curr_size_ < self.min_size_): + self.lock_ = Lock() + for i in range(max_size): self._create_conn() + def _del__(self): + self.destroy() + def _create_conn(self): infinity_coon = infinity.connect(self.uri_) - self.curr_size_ += 1 self.free_pool_.append(infinity_coon) - self.created_conns_.append(infinity_coon) - def get_conn(self) -> RemoteThriftInfinityConnection: - with self.cond_: - while True: - is_waken = self.cond_.wait_for(lambda: len(self.free_pool_) != 0 or self.curr_size_ < self.max_size_, self.timeout_) - if is_waken: - break - logging.warn("connection waiting...") + def get_conn(self): + with self.lock_: if (len(self.free_pool_) == 0): self._create_conn() conn = self.free_pool_.pop() @@ -45,17 +32,18 @@ def get_conn(self) -> RemoteThriftInfinityConnection: def release_conn(self, conn): - with self.cond_: - if(not self.created_conns_.count(conn)): - raise Exception("the connection is unknown") + """ + Note: User is allowed to release a connection not created by ConnectionPool, or not releasing(due to exception or some other reasons) a connection created by ConnectionPool. + """ + with self.lock_: if(self.free_pool_.count(conn)): raise Exception("the connection has been released") - self.curr_size_ -= 1 - self.free_pool_.append(conn) - self.cond_.notify_all() + if (len(self.free_pool_) < self.max_size_): + self.free_pool_.append(conn) logging.debug("release_conn") def destroy(self): - for conn in iter(self.created_conns_): - conn.disconnect() \ No newline at end of file + for conn in iter(self.free_pool_): + conn.disconnect() + self.free_pool_.clear() \ No newline at end of file diff --git a/python/test_pysdk/test_connection_pool.py b/python/test_pysdk/test_connection_pool.py index 07582a09bb..9f464cb67a 100644 --- a/python/test_pysdk/test_connection_pool.py +++ b/python/test_pysdk/test_connection_pool.py @@ -2,7 +2,6 @@ import os import pytest from common import common_values -import time from infinity.connection_pool import ConnectionPool from infinity.common import ConflictType @@ -39,12 +38,14 @@ def teardown(self): @pytest.mark.usefixtures("skip_if_local_infinity") def test_connection_pool(self, suffix): - connection_pool = ConnectionPool(uri=self.uri, min_size=4, max_size=8) - assert len(connection_pool.free_pool_) == 4 + connection_pool = ConnectionPool(uri=self.uri, max_size=8) + assert len(connection_pool.free_pool_) == 8 infinity_obj = connection_pool.get_conn() + _ = connection_pool.get_conn() # It's safe for user to not releasing (due to exception or some other reasons) a connection created by ConnectionPool + _ = connection_pool.get_conn() # It's safe for user to not releasing (due to exception or some other reasons) a connection created by ConnectionPool assert infinity_obj - assert len(connection_pool.free_pool_) == 3 + assert len(connection_pool.free_pool_) == 5 infinity_obj.drop_database("my_database"+suffix, conflict_type=ConflictType.Ignore) db = infinity_obj.create_database("my_database"+suffix) @@ -62,7 +63,11 @@ def test_connection_pool(self, suffix): infinity_obj.drop_database("my_database"+suffix, conflict_type=ConflictType.Error) connection_pool.release_conn(infinity_obj) - assert len(connection_pool.free_pool_) == 4 + assert len(connection_pool.free_pool_) == 6 + + external_infinity_obj = infinity.connect(self.uri) + connection_pool.release_conn(external_infinity_obj) + assert len(connection_pool.free_pool_) == 7 # It's safe for user to release a connection not created by ConnectionPool try: connection_pool.release_conn(infinity_obj) @@ -71,15 +76,3 @@ def test_connection_pool(self, suffix): else: assert "no exception when double release" == 0 connection_pool.destroy() - - @pytest.mark.usefixtures("skip_if_local_infinity") - def test_time_out(self): - #test timeout is ok - connection_pool = ConnectionPool(uri=self.uri, min_size=4, max_size=8, timeout=5.0) - begin_time = time.time() - try: - while True: - connection_pool.get_conn() - except Exception as e: - print(e) - assert time.time() - begin_time < 10 \ No newline at end of file diff --git a/python/test_pysdk/test_knn.py b/python/test_pysdk/test_knn.py index b15ae72a48..095f2ad0a6 100644 --- a/python/test_pysdk/test_knn.py +++ b/python/test_pysdk/test_knn.py @@ -6,7 +6,6 @@ from common import common_index import infinity import infinity_embedded -from infinity.remote_thrift.infinity import RemoteThriftInfinityConnection import infinity.index as index from infinity.errors import ErrorCode from infinity.common import ConflictType, InfinityException, SparseVector @@ -760,6 +759,19 @@ def test_with_fulltext_match_with_valid_columns(self, check_data, match_param_1, .to_pl()) print(res_filter_2) pl_assert_frame_equal(res_filter_1, res_filter_2) + + # filter_fulltext = "num!=98 AND num != 12 AND filter_fulltext('body', 'harmful chemical')" + # filter_fulltext = """num!=98 AND num != 12 AND filter_fulltext('body', '(harmful OR chemical)')""" + # filter_fulltext = """num!=98 AND num != 12 AND filter_fulltext('body', '("harmful" OR "chemical")')""" + # filter_fulltext = """(num!=98 AND num != 12) AND filter_fulltext('body', '(("harmful" OR "chemical"))')""" + filter_fulltext = """(num!=98 AND num != 12) AND filter_fulltext('body^3,body,body^2', '(("harmful" OR "chemical"))')""" + _ = (table_obj + .output(["*"]) + .match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float", "ip", 1, {"filter": filter_fulltext}) + .match_text(match_param_1, "black", 1, {"filter": "num!=98 AND num != 12"}) + .fusion(method='rrf', topn=10) + .to_pl()) + with pytest.raises(InfinityException) as e_info: res_filter_3 = (table_obj .output(["*"])