Skip to content

Commit

Permalink
Rework pysdk connection pool (#2105)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Rework pysdk connection pool

### Type of change

- [x] Refactoring
- [x] Python SDK impacted, Need to update PyPI
  • Loading branch information
yuzhichang authored Oct 25, 2024
1 parent ac2ceb1 commit ef03d3e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 56 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:

# https://github.com/actions/checkout/blob/v3/README.md
- name: Check out code
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
ssh-key: ${{ secrets.MY_DEPLOY_KEY }}

Expand Down Expand Up @@ -55,14 +55,14 @@ jobs:
run: |
if [[ $GITHUB_EVENT_NAME == 'schedule' ]]; then
# Determine if a given tag exists and matches a specific Git commit.
# actions/checkout@v3 fetch-tags doesn't work when triggered by schedule
# actions/checkout@v4 fetch-tags doesn't work when triggered by schedule
git fetch --tags
if [ "$(git rev-parse -q --verify "refs/tags/$RELEASE_TAG")" = "$GITHUB_SHA" ]; then
echo "mutalbe tag $RELEASE_TAG exists and matchs $GITHUB_SHA"
echo "mutable tag $RELEASE_TAG exists and matches $GITHUB_SHA"
else
git tag -f $RELEASE_TAG $GITHUB_SHA
git push -f origin $RELEASE_TAG:refs/tags/$RELEASE_TAG
echo "created/moved mutalbe tag $RELEASE_TAG to $GITHUB_SHA"
echo "created/moved mutable tag $RELEASE_TAG to $GITHUB_SHA"
fi
fi
Expand All @@ -83,7 +83,7 @@ jobs:

- name: Create or overwrite a release
# https://github.com/actions/upload-release-asset has been replaced by https://github.com/softprops/action-gh-release
uses: softprops/action-gh-release@v1
uses: softprops/action-gh-release@v2
with:
token: ${{ secrets.MY_GITHUB_TOKEN }} # Use the secret as an environment variable
prerelease: ${{ env.PRERELEASE }}
Expand All @@ -110,7 +110,7 @@ jobs:

# https://github.com/marketplace/actions/build-and-push-docker-images
- name: Build and push
uses: docker/build-push-action@v5
uses: docker/build-push-action@v6
with:
context: .
push: true
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
runs-on: [ "self-hosted", "debug" ]
steps:
# https://github.com/hmarr/debug-action
#- uses: hmarr/debug-action@v2
#- uses: hmarr/debug-action@v3

- name: Show PR labels
if: ${{ !cancelled() && !failure() }}
Expand Down Expand Up @@ -195,7 +195,7 @@ jobs:
runs-on: [ "self-hosted", "benchmark" ]
steps:
# https://github.com/hmarr/debug-action
#- uses: hmarr/debug-action@v2
#- uses: hmarr/debug-action@v3

- name: Show PR labels
if: ${{ !cancelled() && !failure() }}
Expand Down
48 changes: 18 additions & 30 deletions python/infinity_sdk/infinity/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,29 @@
from threading import Lock, Condition
from threading import Lock
import infinity
from infinity.common import NetworkAddress
import logging
from infinity.remote_thrift.infinity import RemoteThriftInfinityConnection


class ConnectionPool(object):


def __init__(self, uri = NetworkAddress("127.0.0.1", 23817), min_size=4, max_size=16, timeout=10.0):
assert (min_size <= max_size)
def __init__(self, uri = NetworkAddress("127.0.0.1", 23817), max_size=16):
self.uri_ = uri
self.min_size_ = min_size
self.max_size_ = max_size
self.curr_size_ = 0
self.timeout_ = timeout
self.free_pool_ = []
self.created_conns_ = []
self.cond_ = Condition()

while(self.curr_size_ < self.min_size_):
self.lock_ = Lock()
for i in range(max_size):
self._create_conn()


def _del__(self):
self.destroy()

def _create_conn(self):
infinity_coon = infinity.connect(self.uri_)
self.curr_size_ += 1
self.free_pool_.append(infinity_coon)
self.created_conns_.append(infinity_coon)


def get_conn(self) -> RemoteThriftInfinityConnection:
with self.cond_:
while True:
is_waken = self.cond_.wait_for(lambda: len(self.free_pool_) != 0 or self.curr_size_ < self.max_size_, self.timeout_)
if is_waken:
break
logging.warn("connection waiting...")
def get_conn(self):
with self.lock_:
if (len(self.free_pool_) == 0):
self._create_conn()
conn = self.free_pool_.pop()
Expand All @@ -45,17 +32,18 @@ def get_conn(self) -> RemoteThriftInfinityConnection:


def release_conn(self, conn):
with self.cond_:
if(not self.created_conns_.count(conn)):
raise Exception("the connection is unknown")
"""
Note: User is allowed to release a connection not created by ConnectionPool, or not releasing(due to exception or some other reasons) a connection created by ConnectionPool.
"""
with self.lock_:
if(self.free_pool_.count(conn)):
raise Exception("the connection has been released")
self.curr_size_ -= 1
self.free_pool_.append(conn)
self.cond_.notify_all()
if (len(self.free_pool_) < self.max_size_):
self.free_pool_.append(conn)
logging.debug("release_conn")


def destroy(self):
for conn in iter(self.created_conns_):
conn.disconnect()
for conn in iter(self.free_pool_):
conn.disconnect()
self.free_pool_.clear()
27 changes: 10 additions & 17 deletions python/test_pysdk/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import pytest
from common import common_values
import time
from infinity.connection_pool import ConnectionPool
from infinity.common import ConflictType

Expand Down Expand Up @@ -39,12 +38,14 @@ def teardown(self):

@pytest.mark.usefixtures("skip_if_local_infinity")
def test_connection_pool(self, suffix):
connection_pool = ConnectionPool(uri=self.uri, min_size=4, max_size=8)
assert len(connection_pool.free_pool_) == 4
connection_pool = ConnectionPool(uri=self.uri, max_size=8)
assert len(connection_pool.free_pool_) == 8

infinity_obj = connection_pool.get_conn()
_ = connection_pool.get_conn() # It's safe for user to not releasing (due to exception or some other reasons) a connection created by ConnectionPool
_ = connection_pool.get_conn() # It's safe for user to not releasing (due to exception or some other reasons) a connection created by ConnectionPool
assert infinity_obj
assert len(connection_pool.free_pool_) == 3
assert len(connection_pool.free_pool_) == 5

infinity_obj.drop_database("my_database"+suffix, conflict_type=ConflictType.Ignore)
db = infinity_obj.create_database("my_database"+suffix)
Expand All @@ -62,7 +63,11 @@ def test_connection_pool(self, suffix):
infinity_obj.drop_database("my_database"+suffix, conflict_type=ConflictType.Error)

connection_pool.release_conn(infinity_obj)
assert len(connection_pool.free_pool_) == 4
assert len(connection_pool.free_pool_) == 6

external_infinity_obj = infinity.connect(self.uri)
connection_pool.release_conn(external_infinity_obj)
assert len(connection_pool.free_pool_) == 7 # It's safe for user to release a connection not created by ConnectionPool

try:
connection_pool.release_conn(infinity_obj)
Expand All @@ -71,15 +76,3 @@ def test_connection_pool(self, suffix):
else:
assert "no exception when double release" == 0
connection_pool.destroy()

@pytest.mark.usefixtures("skip_if_local_infinity")
def test_time_out(self):
#test timeout is ok
connection_pool = ConnectionPool(uri=self.uri, min_size=4, max_size=8, timeout=5.0)
begin_time = time.time()
try:
while True:
connection_pool.get_conn()
except Exception as e:
print(e)
assert time.time() - begin_time < 10
14 changes: 13 additions & 1 deletion python/test_pysdk/test_knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from common import common_index
import infinity
import infinity_embedded
from infinity.remote_thrift.infinity import RemoteThriftInfinityConnection
import infinity.index as index
from infinity.errors import ErrorCode
from infinity.common import ConflictType, InfinityException, SparseVector
Expand Down Expand Up @@ -760,6 +759,19 @@ def test_with_fulltext_match_with_valid_columns(self, check_data, match_param_1,
.to_pl())
print(res_filter_2)
pl_assert_frame_equal(res_filter_1, res_filter_2)

# filter_fulltext = "num!=98 AND num != 12 AND filter_fulltext('body', 'harmful chemical')"
# filter_fulltext = """num!=98 AND num != 12 AND filter_fulltext('body', '(harmful OR chemical)')"""
# filter_fulltext = """num!=98 AND num != 12 AND filter_fulltext('body', '("harmful" OR "chemical")')"""
# filter_fulltext = """(num!=98 AND num != 12) AND filter_fulltext('body', '(("harmful" OR "chemical"))')"""
filter_fulltext = """(num!=98 AND num != 12) AND filter_fulltext('body^3,body,body^2', '(("harmful" OR "chemical"))')"""
_ = (table_obj
.output(["*"])
.match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float", "ip", 1, {"filter": filter_fulltext})
.match_text(match_param_1, "black", 1, {"filter": "num!=98 AND num != 12"})
.fusion(method='rrf', topn=10)
.to_pl())

with pytest.raises(InfinityException) as e_info:
res_filter_3 = (table_obj
.output(["*"])
Expand Down

0 comments on commit ef03d3e

Please sign in to comment.