Skip to content

Commit

Permalink
Cloud API: Naming things, improved UX, more OO
Browse files Browse the repository at this point in the history
- Naming things: Use more appropriate names for variables.
- OO: Add `EnvironmentConfiguration`, to manage information about the
  toolkit environment.
- OO: Add `CloudJob` entity, to manage information about a cloud job,
  and to streamline passing of information.
- OO: Refactor low-level methods to `CloudCluster`.
- Capability to configure error handling based on the environment.
- Capability to conveniently acquire configuration settings from the
  environment, for obtaining the CrateDB Cluster identifier or name.
- Tests: Add `DockerSkippingContainer` and `PytestTestcontainerAdapter`.
  Both skip test execution when the Docker daemon is not running, or not
  available to the environment.
- Examples: Improve UX of `cloud_*.py` example programs.
  • Loading branch information
amotl committed Nov 15, 2023
1 parent f55f9ac commit 44e08bc
Show file tree
Hide file tree
Showing 27 changed files with 879 additions and 352 deletions.
4 changes: 4 additions & 0 deletions cratedb_toolkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
__version__ = version(__appname__)
except PackageNotFoundError: # pragma: no cover
__version__ = "unknown"

from .api import ManagedCluster # noqa: F401
from .config import configure # noqa: F401
from .model import InputOutputResource, TableAddress # noqa: F401
178 changes: 129 additions & 49 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,163 @@
import dataclasses
import json
import logging
import sys
import typing as t
from abc import abstractmethod

from cratedb_toolkit.api.guide import GuidingTexts
from cratedb_toolkit.cluster.util import deploy_cluster, get_cluster_by_name, get_cluster_info
from cratedb_toolkit.config import CONFIG
from cratedb_toolkit.exception import CroudException, OperationFailed
from cratedb_toolkit.io.croud import CloudIo
from cratedb_toolkit.io.croud import CloudIo, CloudJob
from cratedb_toolkit.model import ClusterInformation, DatabaseAddress, InputOutputResource, TableAddress
from cratedb_toolkit.util.runtime import flexfun
from cratedb_toolkit.util.setting import RequiredMutuallyExclusiveSettingsGroup, Setting

logger = logging.getLogger(__name__)


class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, source: InputOutputResource, target: TableAddress):
raise NotImplementedError("Child class needs to implement this method")


@dataclasses.dataclass
class ManagedCluster(ClusterBase):
"""
Wrap a managed CrateDB database cluster on CrateDB Cloud.
"""

cloud_id: t.Optional[str] = None
name: t.Optional[str] = None
address: t.Optional[DatabaseAddress] = None
info: t.Optional[ClusterInformation] = None
exists: bool = False
settings_spec = RequiredMutuallyExclusiveSettingsGroup(
Setting(
name="--cluster-id",
envvar="CRATEDB_CLOUD_CLUSTER_ID",
help="The CrateDB Cloud cluster identifier, an UUID",
),
Setting(
name="--cluster-name",
envvar="CRATEDB_CLOUD_CLUSTER_NAME",
help="The CrateDB Cloud cluster name",
),
)

def __init__(
self,
id: str = None, # noqa: A002
name: str = None,
address: DatabaseAddress = None,
info: ClusterInformation = None,
):
self.id = id
self.name = name
self.address = address
self.info: ClusterInformation = info or ClusterInformation()
self.exists: bool = False
if self.id is None and self.name is None:
raise ValueError("Failed to address cluster: Either cluster identifier or name needs to be specified")

Check warning on line 58 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L52-L58

Added lines #L52 - L58 were not covered by tests

@classmethod
@flexfun(domain="settings")
def from_env(cls) -> "ManagedCluster":
"""
Obtain CrateDB Cloud cluster identifier or name from user environment.
The settings are mutually exclusive.
def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id} ({self.name})")
self.probe()
When the toolkit environment is configured with `settings_accept_cli`,
the settings can be specified that way:
--cluster-id=e1e38d92-a650-48f1-8a70-8133f2d5c400
--cluster-name=Hotzenplotz
When the toolkit environment is configured with `settings_accept_env`,
the settings can be specified that way:
export CRATEDB_CLOUD_CLUSTER_ID=e1e38d92-a650-48f1-8a70-8133f2d5c400
export CRATEDB_CLOUD_CLUSTER_NAME=Hotzenplotz
"""
if not CONFIG.settings_accept_cli or not CONFIG.settings_accept_env:

Check warning on line 79 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L79

Added line #L79 was not covered by tests
# scsc
raise ValueError("Unable to obtain cluster identifier or name")
try:
cluster_id, cluster_name = cls.settings_spec.obtain_settings()
return cls(id=cluster_id, name=cluster_name)
except ValueError as ex:
logger.error(f"Failed to address cluster: {ex}")
if CONFIG.settings_errors == "exit":
sys.exit(1)

Check warning on line 88 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L81-L88

Added lines #L81 - L88 were not covered by tests
else:
raise

Check warning on line 90 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L90

Added line #L90 was not covered by tests

def probe(self):
def stop(self) -> "ManagedCluster":
logger.warning("Stopping cluster not implemented yet")
return self

Check warning on line 94 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L93-L94

Added lines #L93 - L94 were not covered by tests

def delete(self) -> "ManagedCluster":
return self

Check warning on line 97 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L97

Added line #L97 was not covered by tests

def probe(self) -> "ManagedCluster":
"""
Probe a CrateDB Cloud cluster, API-wise.
"""
if not self.cloud_id and not self.name:
self.exists = False
raise ValueError("Either cluster identifier or name needs to be specified")
try:
if self.cloud_id:
self.info = get_cluster_info(cluster_id=self.cloud_id)
if self.id:
self.info = get_cluster_info(cluster_id=self.id)
self.name = self.info.cloud["name"]
else:
elif self.name:
self.info = get_cluster_by_name(self.name)
self.cloud_id = self.info.cloud["id"]
self.id = self.info.cloud["id"]

Check warning on line 109 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L103-L109

Added lines #L103 - L109 were not covered by tests
else:
self.exists = False
raise ValueError("Failed to address cluster: Either cluster identifier or name needs to be specified")
except CroudException as ex:
self.exists = False
if "Cluster not found" not in str(ex):
raise
if self.info:
if self.info.cloud:
self.exists = True
logger.info(f"Cluster information: name={self.info.cloud.get('name')}, url={self.info.cloud.get('url')}")
return self

Check warning on line 120 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L111-L120

Added lines #L111 - L120 were not covered by tests

def acquire(self):
@flexfun(domain="runtime")
def start(self) -> "ManagedCluster":
"""
Start a database cluster.
When cluster does not exist, acquire/deploy it.
"""
logger.info(f"Deploying/starting/resuming CrateDB Cloud Cluster: id={self.id}, name={self.name}")
self.acquire()
return self

Check warning on line 130 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L128-L130

Added lines #L128 - L130 were not covered by tests

def acquire(self) -> "ManagedCluster":
"""
Acquire a database cluster.
When cluster does not exist, deploy it.
This means going through the steps of deploy and/or start, as applicable.
- When cluster does not exist, create/deploy it.
- When a cluster exists, but is stopped/hibernated, start/resume it.
"""
self.probe()
if not self.exists:
logger.info(f"Cluster does not exist: {self.name}")
logger.info(f"Deploying cluster: {self.name}")
if self.deploy():
self.probe()
else:
logger.info(f"Cluster does not exist, deploying it: id={self.id}, name={self.name}")
self.deploy()
self.probe()
if not self.exists:

Check warning on line 145 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L140-L145

Added lines #L140 - L145 were not covered by tests
# TODO: Is it possible to gather and propagate more information why the deployment failed?
raise CroudException(f"Deployment of cluster failed: {self.name}")
return self

Check warning on line 148 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L147-L148

Added lines #L147 - L148 were not covered by tests

def deploy(self):
def deploy(self) -> "ManagedCluster":
"""
Run the cluster deployment procedure.
"""
try:
deploy_cluster(self.name)
except CroudException:
return False
return True

def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None):
# FIXME: Accept id or name.
if self.name is None:
raise ValueError("Need cluster name to deploy")
deploy_cluster(self.name)
return self

Check warning on line 158 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L155-L158

Added lines #L155 - L158 were not covered by tests

@flexfun(domain="runtime")
def load_table(self, source: InputOutputResource, target: t.Optional[TableAddress] = None) -> CloudJob:
"""
Load data into a database table on CrateDB Cloud.
Expand All @@ -93,33 +170,36 @@ def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddr
https://console.cratedb.cloud
"""

self.probe()

Check warning on line 173 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L173

Added line #L173 was not covered by tests
target = target or TableAddress()

# FIXME: Accept id or name.
if self.id is None:
raise ValueError("Need cluster identifier to load table")

Check warning on line 178 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L177-L178

Added lines #L177 - L178 were not covered by tests

try:
cio = CloudIo(cluster_id=self.cloud_id)
cio = CloudIo(cluster_id=self.id)

Check warning on line 181 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L181

Added line #L181 was not covered by tests
except CroudException as ex:
msg = f"Connecting to cluster resource failed: {self.cloud_id}. Reason: {ex}"
if "Resource not found" in str(ex):
logger.error(msg)
return None, False
msg = f"Connecting to cluster resource failed: {self.id}. Reason: {ex}"

Check warning on line 183 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L183

Added line #L183 was not covered by tests
logger.exception(msg)
raise OperationFailed(msg) from ex

try:
job_info, success = cio.load_resource(resource=resource, target=target)
logger.info("Job information:\n%s", json.dumps(job_info, indent=2))
cloud_job = cio.load_resource(resource=source, target=target)
logger.info("Job information:\n%s", json.dumps(cloud_job.info, indent=2))

Check warning on line 189 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L188-L189

Added lines #L188 - L189 were not covered by tests
# TODO: Explicitly report about `failed_records`, etc.
texts = GuidingTexts(
admin_url=self.info.cloud["url"],
table_name=job_info["destination"]["table"],
table_name=cloud_job.info["destination"]["table"],
)
if success:
if cloud_job.success:

Check warning on line 195 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L195

Added line #L195 was not covered by tests
logger.info("Data loading was successful: %s", texts.success())
return job_info, success
return cloud_job

Check warning on line 197 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L197

Added line #L197 was not covered by tests
else:
# TODO: Add "reason" to exception message.
logger.error(f"Data loading failed: {texts.error()}")
raise OperationFailed("Data loading failed")
message = f"Data loading failed: {cloud_job.message}"
logger.error(f"{message}{texts.error()}")
raise OperationFailed(message)

Check warning on line 202 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L200-L202

Added lines #L200 - L202 were not covered by tests

# When exiting so, it is expected that error logging has taken place appropriately.
except CroudException as ex:
Expand All @@ -137,7 +217,7 @@ class StandaloneCluster(ClusterBase):
address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, source: InputOutputResource, target: TableAddress):
"""
Load data into a database table on a standalone CrateDB Server.
Expand All @@ -148,7 +228,7 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""
source_url = resource.url
source_url = source.url
target_url = self.address.dburi
if source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy
Expand Down
93 changes: 91 additions & 2 deletions cratedb_toolkit/cluster/croud.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
import typing as t
from pathlib import Path

from cratedb_toolkit.model import InputOutputResource, TableAddress
from cratedb_toolkit.util.croud import CroudCall, CroudWrapper


class CloudManager:
"""
A wrapper around the CrateDB Cloud cluster API through the `croud` package.
A wrapper around the CrateDB Cloud API through the `croud` package, providing common methods.
"""

def get_cluster_list(self):
Expand Down Expand Up @@ -71,8 +74,12 @@ def deploy_cluster(self, name: str, project_id: str):
croud clusters get e1e38d92-a650-48f1-8a70-8133f2d5c400 --format=json
""" # noqa: E501
# TODO: Use specific subscription, or, if only one exists, use it.
# Alternatively, acquire value from user environment.
# TODO: `--product-name=crfree` is not always the right choice. ;]
# TODO: Auto-generate cluster name when not given.
# TODO: How to select CrateDB nightly, like `--version=nightly`?
# TODO: Let the user provide the credentials.
# TODO: Add more parameters, like `--org-id`, `--channel`, `--unit`, and more.
# TODO: What about `--sudo`?
from croud.__main__ import command_tree
Expand Down Expand Up @@ -110,7 +117,7 @@ def deploy_cluster(self, name: str, project_id: str):

class CloudCluster:
"""
A wrapper around the CrateDB Cloud cluster API through the `croud` package.
A wrapper around the CrateDB Cloud API through the `croud` package, providing methods specific to a cluster.
"""

def __init__(self, cluster_id: str):
Expand Down Expand Up @@ -142,3 +149,85 @@ def get_info(self):

wr = CroudWrapper(call=call)
return wr.invoke()

def list_jobs(self):
from croud.clusters.commands import import_jobs_list
from croud.parser import Argument

Check warning on line 155 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L154-L155

Added lines #L154 - L155 were not covered by tests

call = CroudCall(

Check warning on line 157 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L157

Added line #L157 was not covered by tests
fun=import_jobs_list,
specs=[Argument("--cluster-id", type=str, required=True, help="The cluster the import jobs belong to.")],
arguments=[
f"--cluster-id={self.cluster_id}",
],
)

wr = CroudWrapper(call=call)
return wr.invoke()

Check warning on line 166 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L165-L166

Added lines #L165 - L166 were not covered by tests

def create_import_job(self, resource: InputOutputResource, target: TableAddress) -> t.Dict[str, t.Any]:
from croud.__main__ import import_job_create_common_args
from croud.clusters.commands import import_jobs_create_from_file, import_jobs_create_from_url
from croud.parser import Argument

Check warning on line 171 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L169-L171

Added lines #L169 - L171 were not covered by tests

specs: t.List[Argument] = import_job_create_common_args

Check warning on line 173 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L173

Added line #L173 was not covered by tests

url_argument = Argument("--url", type=str, required=True, help="The URL the import file will be read from.")

Check warning on line 175 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L175

Added line #L175 was not covered by tests

file_id_argument = Argument(

Check warning on line 177 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L177

Added line #L177 was not covered by tests
"--file-id",
type=str,
required=False,
help="The file ID that will be used for the "
"import. If not specified then --file-path"
" must be specified. "
"Please refer to `croud organizations "
"files` for more info.",
)
file_path_argument = Argument(

Check warning on line 187 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L187

Added line #L187 was not covered by tests
"--file-path",
type=str,
required=False,
help="The file in your local filesystem that "
"will be used. If not specified then "
"--file-id must be specified. "
"Please note the file will become visible "
"under `croud organizations files list`.",
)

# Compute command-line arguments for invoking `croud`.
# FIXME: This call is redundant.
path = Path(resource.url)

Check warning on line 200 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L200

Added line #L200 was not covered by tests

# TODO: Sanitize table name. Which characters are allowed?
if path.exists():
specs.append(file_path_argument)
specs.append(file_id_argument)
arguments = [

Check warning on line 206 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L203-L206

Added lines #L203 - L206 were not covered by tests
f"--cluster-id={self.cluster_id}",
f"--file-path={resource.url}",
f"--table={target.table}",
f"--file-format={resource.format}",
]
fun = import_jobs_create_from_file

Check warning on line 212 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L212

Added line #L212 was not covered by tests
else:
specs.append(url_argument)
arguments = [

Check warning on line 215 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L214-L215

Added lines #L214 - L215 were not covered by tests
f"--cluster-id={self.cluster_id}",
f"--url={resource.url}",
f"--table={target.table}",
f"--file-format={resource.format}",
]
fun = import_jobs_create_from_url

Check warning on line 221 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L221

Added line #L221 was not covered by tests

if resource.compression is not None:
arguments += [f"--compression={resource.compression}"]

Check warning on line 224 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L223-L224

Added lines #L223 - L224 were not covered by tests

call = CroudCall(

Check warning on line 226 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L226

Added line #L226 was not covered by tests
fun=fun,
specs=specs,
arguments=arguments,
)

wr = CroudWrapper(call=call)
return wr.invoke()

Check warning on line 233 in cratedb_toolkit/cluster/croud.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/croud.py#L232-L233

Added lines #L232 - L233 were not covered by tests
Loading

0 comments on commit 44e08bc

Please sign in to comment.