Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full test suite on client/admin classes #267

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions astrapy/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@

DEFAULT_NEW_DATABASE_CLOUD_PROVIDER = "gcp"
DEFAULT_NEW_DATABASE_REGION = "us-east1"
WAITING_ON_DB_POLL_PERIOD_SECONDS = 2
DATABASE_POLL_NAMESPACE_SLEEP_TIME = 2
DATABASE_POLL_SLEEP_TIME = 15

STATUS_MAINTENANCE = "MAINTENANCE"
STATUS_ACTIVE = "ACTIVE"
Expand Down Expand Up @@ -461,6 +462,7 @@ def create_database(
"cloudProvider": cloud_provider,
"region": region,
"capacityUnits": 1,
"dbType": "vector",
}
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
Expand All @@ -474,7 +476,7 @@ def create_database(
if wait_until_active:
last_status_seen = STATUS_PENDING
while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}:
time.sleep(WAITING_ON_DB_POLL_PERIOD_SECONDS)
time.sleep(DATABASE_POLL_SLEEP_TIME)
last_status_seen = self.database_info(
id=new_database_id,
max_time_ms=timeout_manager.remaining_timeout_ms(),
Expand Down Expand Up @@ -543,7 +545,7 @@ def drop_database(
last_status_seen: Optional[str] = STATUS_TERMINATING
_db_name: Optional[str] = None
while last_status_seen == STATUS_TERMINATING:
time.sleep(WAITING_ON_DB_POLL_PERIOD_SECONDS)
time.sleep(DATABASE_POLL_SLEEP_TIME)
#
detected_databases = [
a_db_info
Expand Down Expand Up @@ -986,7 +988,7 @@ def create_namespace(
if wait_until_active:
last_status_seen = STATUS_MAINTENANCE
while last_status_seen == STATUS_MAINTENANCE:
time.sleep(WAITING_ON_DB_POLL_PERIOD_SECONDS)
time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME)
last_status_seen = self.info(
max_time_ms=timeout_manager.remaining_timeout_ms(),
).status
Expand Down Expand Up @@ -1054,7 +1056,7 @@ def drop_namespace(
if wait_until_active:
last_status_seen = STATUS_MAINTENANCE
while last_status_seen == STATUS_MAINTENANCE:
time.sleep(WAITING_ON_DB_POLL_PERIOD_SECONDS)
time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME)
last_status_seen = self.info(
max_time_ms=timeout_manager.remaining_timeout_ms(),
).status
Expand Down
290 changes: 290 additions & 0 deletions tests/idiomatic/integration/test_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Callable, List, Optional, Tuple

import pytest

import os
import time

from astrapy import DataAPIClient
from astrapy.admin import API_ENDPOINT_TEMPLATE_MAP


NAMESPACE_POLL_SLEEP_TIME = 2
NAMESPACE_TIMEOUT = 30
DATABASE_POLL_SLEEP_TIME = 10
DATABASE_TIMEOUT = 480


DO_IDIOMATIC_ADMIN_TESTS: bool
if "DO_IDIOMATIC_ADMIN_TESTS" in os.environ:
_do_idiomatic_admin_tests = os.environ["DO_IDIOMATIC_ADMIN_TESTS"]
if _do_idiomatic_admin_tests.strip():
DO_IDIOMATIC_ADMIN_TESTS = int(_do_idiomatic_admin_tests) != 0
else:
DO_IDIOMATIC_ADMIN_TESTS = False
else:
DO_IDIOMATIC_ADMIN_TESTS = False


def admin_test_envs_tokens() -> List[Any]:
"""
This actually returns a List of `_pytest.mark.structures.ParameterSet` instances,
each wrapping a Tuple[str, Optional[str]] = (env, token)
"""
envs_tokens: List[Any] = []
for env in ["prod", "dev"]:
varname = f"{env.upper()}_ADMIN_TEST_ASTRA_DB_APPLICATION_TOKEN"
markers = []
pair: Tuple[str, Optional[str]]
if varname in os.environ:
pair = (env, os.environ[varname])
else:
pair = (env, None)
markers.append(pytest.mark.skip(reason=f"{env} token not available"))
envs_tokens.append(pytest.param(pair, marks=markers))

return envs_tokens


def wait_until_true(
poll_interval: int, max_seconds: int, condition: Callable[..., bool]
) -> None:
ini_time = time.time()
while not condition():
if time.time() - ini_time > max_seconds:
raise ValueError("Timed out on condition.")
time.sleep(poll_interval)


@pytest.mark.skipif(not DO_IDIOMATIC_ADMIN_TESTS, reason="Admin tests are suppressed")
class TestAdmin:
@pytest.mark.parametrize("env_token", admin_test_envs_tokens())
@pytest.mark.describe("test of the full tour with AstraDBDatabaseAdmin")
def test_astra_db_database_admin(self, env_token: Tuple[str, str]) -> None:
"""
Test plan (it has to be a single giant test to use one DB throughout):
- create client -> get_admin
- create a db (wait)
- with the AstraDBDatabaseAdmin:
- info
- list namespaces, check
- create 2 namespaces (wait, nonwait)
- list namespaces, check
- get_database -> create_collection/list_collection_names
- get_async_database, check if == previous
- drop namespaces (wait, nonwait)
- list namespaces, check
- drop database (wait)
- check DB not existings
"""
env, token = env_token
db_name = f"test_database_{env}"
db_provider = os.environ[f"{env.upper()}_ADMIN_TEST_ASTRA_DB_PROVIDER"]
db_region = os.environ[f"{env.upper()}_ADMIN_TEST_ASTRA_DB_REGION"]

# create client, get admin
client: DataAPIClient
if env == "prod":
client = DataAPIClient(token)
else:
client = DataAPIClient(token, environment=env)
admin = client.get_admin()

# create a db (wait)
db_admin = admin.create_database(
name=db_name,
wait_until_active=True,
cloud_provider=db_provider,
region=db_region,
)

# info with the AstraDBDatabaseAdmin
created_db_id = db_admin.id
assert db_admin.info().id == created_db_id

# list nss
namespaces1 = set(db_admin.list_namespaces())
assert namespaces1 == {"default_keyspace"}

# create two namespaces
w_create_ns_response = db_admin.create_namespace(
"waited_ns",
wait_until_active=True,
)
assert w_create_ns_response == {"ok": 1}

nw_create_ns_response = db_admin.create_namespace(
"nonwaited_ns",
wait_until_active=False,
)
assert nw_create_ns_response == {"ok": 1}
wait_until_true(
poll_interval=NAMESPACE_POLL_SLEEP_TIME,
max_seconds=NAMESPACE_TIMEOUT,
condition=lambda: "nonwaited_ns" in db_admin.list_namespaces(),
)

namespaces3 = set(db_admin.list_namespaces())
assert namespaces3 - namespaces1 == {"waited_ns", "nonwaited_ns"}

# get db and use it
db = db_admin.get_database()
db.create_collection("canary_coll")
assert "canary_coll" in db.list_collection_names()

# check async db is the same
assert db_admin.get_async_database().to_sync() == db

# drop nss, wait, nonwait
w_drop_ns_response = db_admin.drop_namespace(
"waited_ns",
wait_until_active=True,
)
assert w_drop_ns_response == {"ok": 1}

nw_drop_ns_response = db_admin.drop_namespace(
"nonwaited_ns",
wait_until_active=False,
)
assert nw_drop_ns_response == {"ok": 1}
wait_until_true(
poll_interval=NAMESPACE_POLL_SLEEP_TIME,
max_seconds=NAMESPACE_TIMEOUT,
condition=lambda: "nonwaited_ns" not in db_admin.list_namespaces(),
)

# check nss after dropping two of them
namespaces1b = set(db_admin.list_namespaces())
assert namespaces1b == namespaces1

# drop db and check
db_drop_response = db_admin.drop()
assert db_drop_response == {"ok": 1}

db_ids = {db.id for db in admin.list_databases()}
assert created_db_id not in db_ids

@pytest.mark.parametrize("env_token", admin_test_envs_tokens())
@pytest.mark.describe("test of the full tour with AstraDBAdmin and client methods")
def test_astra_db_admin(self, env_token: Tuple[str, str]) -> None:
"""
Test plan (it has to be a single giant test to use the two DBs throughout):
- create client -> get_admin
- create two dbs (wait, nonwait)
- list the two database ids, check
- get check info on one such db through admin
- with the client:
4 get_dbs (a/sync, by id+region/api_endpoint), check if equal
and test their list_collections
- get_db_admin from the admin for one of the dbs
- create ns
- get_database -> list_collection_names
- get_async_database and check == with above
- drop dbs, (wait, nonwait)
"""
env, token = env_token
db_name_w = f"test_database_w_{env}"
db_name_nw = f"test_database_nw_{env}"
db_provider = os.environ[f"{env.upper()}_ADMIN_TEST_ASTRA_DB_PROVIDER"]
db_region = os.environ[f"{env.upper()}_ADMIN_TEST_ASTRA_DB_REGION"]

# create client and get admin
client: DataAPIClient
if env == "prod":
client = DataAPIClient(token)
else:
client = DataAPIClient(token, environment=env)
admin = client.get_admin()

# create the two dbs
db_admin_nw = admin.create_database(
name=db_name_nw,
wait_until_active=False,
cloud_provider=db_provider,
region=db_region,
)
created_db_id_nw = db_admin_nw.id
db_admin_w = admin.create_database(
name=db_name_w,
wait_until_active=True,
cloud_provider=db_provider,
region=db_region,
)
created_db_id_w = db_admin_w.id

def _waiter1() -> bool:
db_ids = {db.id for db in admin.list_databases()}
return created_db_id_nw in db_ids

wait_until_true(
poll_interval=DATABASE_POLL_SLEEP_TIME,
max_seconds=DATABASE_TIMEOUT,
condition=_waiter1,
)

# list, check ids
db_ids = {db.id for db in admin.list_databases()}
assert {created_db_id_nw, created_db_id_w} - db_ids == set()

# get info through admin
db_w_info = admin.database_info(created_db_id_w)
assert db_w_info.id == created_db_id_w

# get and compare dbs obtained by the client
synthetic_api_endpoint = API_ENDPOINT_TEMPLATE_MAP[env].format(
database_id=created_db_id_w,
region=db_region,
)
db_w_d = client.get_database(created_db_id_w)
db_w_r = client.get_database(created_db_id_w, region=db_region)
db_w_e = client.get_database_by_api_endpoint(synthetic_api_endpoint)
adb_w_d = client.get_async_database(created_db_id_w)
adb_w_r = client.get_async_database(created_db_id_w, region=db_region)
adb_w_e = client.get_async_database_by_api_endpoint(synthetic_api_endpoint)
assert isinstance(db_w_d.list_collection_names(), list)
assert db_w_r == db_w_d
assert db_w_e == db_w_d
assert adb_w_d.to_sync() == db_w_d
assert adb_w_r.to_sync() == db_w_d
assert adb_w_e.to_sync() == db_w_d

# get db admin from the admin and use it
db_w_admin = admin.get_database_admin(created_db_id_w)
db_w_admin.create_namespace("additional_namespace")
db_w_from_admin = db_w_admin.get_database()
assert isinstance(db_w_from_admin.list_collection_names(), list)
adb_w_from_admin = db_w_admin.get_async_database()
assert adb_w_from_admin.to_sync() == db_w_from_admin

# drop databases: the w one through the admin, the nw using its db-admin
# (this covers most cases if combined with the
# (w, using db-admin) of test_astra_db_database_admin)
db_nw_admin = admin.get_database_admin(created_db_id_nw)
drop_nw_response = db_nw_admin.drop(wait_until_active=False)
assert drop_nw_response == {"ok": 1}
drop_w_response = admin.drop_database(created_db_id_w)
assert drop_w_response == {"ok": 1}

def _waiter2() -> bool:
db_ids = {db.id for db in admin.list_databases()}
return created_db_id_nw not in db_ids

wait_until_true(
poll_interval=DATABASE_POLL_SLEEP_TIME,
max_seconds=DATABASE_TIMEOUT,
condition=_waiter2,
)
Loading