Skip to content

Abstract DatabaseAdmin, admin standard utility conversion/methods + tests thereof #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 237 additions & 3 deletions astrapy/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import re
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from dataclasses import dataclass

Expand Down Expand Up @@ -63,7 +64,11 @@ def __init__(self) -> None:
TEST = "test"


database_id_finder = re.compile(
database_id_matcher = re.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)

api_endpoint_parser = re.compile(
"https://"
"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
"-"
Expand Down Expand Up @@ -116,7 +121,7 @@ def parse_api_endpoint(api_endpoint: str) -> Optional[ParsedAPIEndpoint]:
The parsed ParsedAPIEndpoint. If parsing fails, return None.
"""

match = database_id_finder.match(api_endpoint)
match = api_endpoint_parser.match(api_endpoint)
if match and match.groups():
d_id, d_re, d_en_x = match.groups()
return ParsedAPIEndpoint(
Expand Down Expand Up @@ -327,6 +332,103 @@ def __repr__(self) -> str:
env_desc = f', environment="{self.environment}"'
return f'{self.__class__.__name__}("{self.token[:12]}..."{env_desc})'

def __eq__(self, other: Any) -> bool:
if isinstance(other, AstraDBAdmin):
return all(
[
self.token == other.token,
self.environment == other.environment,
self.dev_ops_url == other.dev_ops_url,
self.dev_ops_url == other.dev_ops_url,
self._caller_name == other._caller_name,
self._caller_version == other._caller_version,
self._dev_ops_url == other._dev_ops_url,
self._dev_ops_api_version == other._dev_ops_api_version,
self._astra_db_ops == other._astra_db_ops,
]
)
else:
return False

def _copy(
self,
*,
token: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> AstraDBAdmin:
return AstraDBAdmin(
token=token or self.token,
environment=environment or self.environment,
caller_name=caller_name or self._caller_name,
caller_version=caller_version or self._caller_version,
dev_ops_url=dev_ops_url or self._dev_ops_url,
dev_ops_api_version=dev_ops_api_version or self._dev_ops_api_version,
)

def with_options(
self,
*,
token: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> AstraDBAdmin:
"""
Create a clone of this AstraDBAdmin with some changed attributes.

Args:
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.

Returns:
a new AstraDBAdmin instance.

Example:
>>> another_astra_db_admin = my_astra_db_admin.with_options(
... caller_name="caller_identity",
... caller_version="1.2.0",
... )
"""

return self._copy(
token=token,
caller_name=caller_name,
caller_version=caller_version,
)

def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the DevOps API calls will be performed (the "caller").

New objects spawned from this client afterwards will inherit the new settings.

Args:
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.

Example:
>>> my_astra_db_admin.set_caller(
... caller_name="the_caller",
... caller_version="0.1.0",
... )
"""

self._caller_name = caller_name
self._caller_version = caller_version
self._astra_db_ops.set_caller(caller_name, caller_version)

@ops_recast_method_sync
def list_databases(
self,
Expand Down Expand Up @@ -717,7 +819,45 @@ def get_async_database(
).to_async()


class AstraDBDatabaseAdmin:
class DatabaseAdmin(ABC):
"""
An abstract class defining the interface for a database admin object.
This supports generic namespace crud, as well as spawning databases,
without committing to a specific database architecture (e.g. Astra DB).
"""

@abstractmethod
def list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]:
"""Get a list of namespaces for the database."""
...

@abstractmethod
def create_namespace(self, name: str, *pargs: Any, **kwargs: Any) -> Dict[str, Any]:
"""
Create a namespace in the database, returning {'ok': 1} if successful.
"""
...

@abstractmethod
def drop_namespace(self, name: str, *pargs: Any, **kwargs: Any) -> Dict[str, Any]:
"""
Drop (delete) a namespace from the database,
returning {'ok': 1} if successful.
"""
...

@abstractmethod
def get_database(self, *pargs: Any, **kwargs: Any) -> Database:
"""Get a Database object from this database admin."""
...

@abstractmethod
def get_async_database(self, *pargs: Any, **kwargs: Any) -> AsyncDatabase:
"""Get an AsyncDatabase object from this database admin."""
...


class AstraDBDatabaseAdmin(DatabaseAdmin):
"""
An "admin" object, able to perform administrative tasks at the namespaces level
(i.e. within a certani database), such as creating/listing/dropping namespaces.
Expand Down Expand Up @@ -790,6 +930,100 @@ def __repr__(self) -> str:
f'"{self.token[:12]}..."{env_desc})'
)

def __eq__(self, other: Any) -> bool:
if isinstance(other, AstraDBDatabaseAdmin):
return all(
[
self.id == other.id,
self.token == other.token,
self.environment == other.environment,
self._astra_db_admin == other._astra_db_admin,
]
)
else:
return False

def _copy(
self,
id: Optional[str] = None,
token: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> AstraDBDatabaseAdmin:
return AstraDBDatabaseAdmin(
id=id or self.id,
token=token or self.token,
environment=environment or self.environment,
caller_name=caller_name or self._astra_db_admin._caller_name,
caller_version=caller_version or self._astra_db_admin._caller_version,
dev_ops_url=dev_ops_url or self._astra_db_admin._dev_ops_url,
dev_ops_api_version=dev_ops_api_version
or self._astra_db_admin._dev_ops_api_version,
)

def with_options(
self,
*,
id: Optional[str] = None,
token: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> AstraDBDatabaseAdmin:
"""
Create a clone of this AstraDBDatabaseAdmin with some changed attributes.

Args:
id: e. g. "01234567-89ab-cdef-0123-456789abcdef".
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.

Returns:
a new AstraDBDatabaseAdmin instance.

Example:
>>> admin_for_my_other_db = admin_for_my_db.with_options(
... id="abababab-0101-2323-4545-6789abcdef01",
... )
"""

return self._copy(
id=id,
token=token,
caller_name=caller_name,
caller_version=caller_version,
)

def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the DevOps API calls will be performed (the "caller").

New objects spawned from this client afterwards will inherit the new settings.

Args:
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.

Example:
>>> admin_for_my_db.set_caller(
... caller_name="the_caller",
... caller_version="0.1.0",
... )
"""

self._astra_db_admin.set_caller(caller_name, caller_version)

@staticmethod
def from_astra_db_admin(
id: str, *, astra_db_admin: AstraDBAdmin
Expand Down
98 changes: 98 additions & 0 deletions astrapy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

from __future__ import annotations

import re
from typing import Any, Dict, Optional, TYPE_CHECKING

from astrapy.admin import (
Environment,
api_endpoint_parser,
build_api_endpoint,
database_id_matcher,
fetch_raw_database_info_from_id_token,
parse_api_endpoint,
)
Expand Down Expand Up @@ -85,6 +88,101 @@ def __repr__(self) -> str:
env_desc = f', environment="{self.environment}"'
return f'{self.__class__.__name__}("{self.token[:12]}..."{env_desc})'

def __eq__(self, other: Any) -> bool:
if isinstance(other, DataAPIClient):
return all(
[
self.token == other.token,
self.environment == other.environment,
self._caller_name == other._caller_name,
self._caller_version == other._caller_version,
]
)
else:
return False

def __getitem__(self, database_id_or_api_endpoint: str) -> Database:
if re.match(database_id_matcher, database_id_or_api_endpoint):
return self.get_database(database_id_or_api_endpoint)
elif re.match(api_endpoint_parser, database_id_or_api_endpoint):
return self.get_database_by_api_endpoint(database_id_or_api_endpoint)
else:
raise ValueError(
"The provided input does not look like either a database ID "
f"or an API endpoint ('{database_id_or_api_endpoint}')."
)

def _copy(
self,
*,
token: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> DataAPIClient:
return DataAPIClient(
token=token or self.token,
environment=environment or self.environment,
caller_name=caller_name or self._caller_name,
caller_version=caller_version or self._caller_version,
)

def with_options(
self,
*,
token: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> DataAPIClient:
"""
Create a clone of this DataAPIClient with some changed attributes.

Args:
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.

Returns:
a new DataAPIClient instance.

Example:
>>> another_client = my_client.with_options(
... caller_name="caller_identity",
... caller_version="1.2.0",
... )
"""

return self._copy(
token=token,
caller_name=caller_name,
caller_version=caller_version,
)

def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the API calls will be performed (the "caller").

New objects spawned from this client afterwards will inherit the new settings.

Args:
caller_name: name of the application, or framework, on behalf of which
the API API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.

Example:
>>> my_client.set_caller(caller_name="the_caller", caller_version="0.1.0")
"""

self._caller_name = caller_name
self._caller_version = caller_version

def get_database(
self,
id: str,
Expand Down
Loading