diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e4e4a8fb..4e64c6f9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -65,7 +65,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[io,test,develop] + pip install --use-pep517 --prefer-binary --editable=.[io,influxdb,test,develop] - name: Run linter and software tests run: | diff --git a/CHANGES.md b/CHANGES.md index 1b32a40e..dc589e8d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ - Add SQL runner utility primitives to `io.sql` namespace - Add `import_csv_pandas` and `import_csv_dask` utility primitives +- InfluxDB: Add adapter for `influxio` ## 2023/11/06 v0.0.2 diff --git a/cratedb_toolkit/io/README.md b/cratedb_toolkit/io/README.md index 501de4c3..c30ca277 100644 --- a/cratedb_toolkit/io/README.md +++ b/cratedb_toolkit/io/README.md @@ -3,6 +3,11 @@ ## Synopsis + +## Cloud Import API + +Using the [CrateDB Cloud] Import API, you can import files in CSV, JSON, and Parquet formats. + Define the cluster id of your CrateDB Cloud Cluster, and connection credentials. ```shell export CRATEDB_CLOUD_CLUSTER_ID='e1e38d92-a650-48f1-8a70-8133f2d5c400' @@ -19,3 +24,38 @@ Inquire data. ```shell ctk shell --command="SELECT * FROM data_weather LIMIT 10;" ``` + + +## InfluxDB + +Using the adapter to [influxio], you can transfer data from InfluxDB to CrateDB. + +Import two data points into InfluxDB. +```shell +export INFLUX_ORG=example +export INFLUX_TOKEN=token +export INFLUX_BUCKET_NAME=testdrive +export INFLUX_MEASUREMENT=demo +influx bucket create +influx write --precision=s "${INFLUX_MEASUREMENT},region=amazonas temperature=42.42,humidity=84.84 1556896326" +influx write --precision=s "${INFLUX_MEASUREMENT},region=amazonas temperature=45.89,humidity=77.23,windspeed=5.4 1556896327" +influx query "from(bucket:\"${INFLUX_BUCKET_NAME}\") |> range(start:-100y)" +``` + +Transfer data. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table influxdb2://example:token@localhost:8086/testdrive/demo +crash --command "SELECT * FROM testdrive.demo;" +``` + +Todo: More convenient table querying. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk shell --command "SELECT * FROM testdrive.demo;" +ctk show table "testdrive.demo" +``` + + +[CrateDB Cloud]: https://console.cratedb.cloud/ +[influxio]: https://github.com/daq-tools/influxio diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 59224e88..cd71420a 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -65,24 +65,46 @@ def error(self): @make_command(cli, name="table") @click.argument("url") @click.option( - "--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=True, help="CrateDB Cloud cluster identifier" + "--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier" +) +@click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" ) @click.option("--table", type=str, required=False, help="Table name where to import the io") @click.option("--format", "format_", type=str, required=False, help="File format of the import resource") @click.pass_context -def load_table(ctx: click.Context, url: str, cluster_id: str, table: str, format_: str): +def load_table(ctx: click.Context, url: str, cluster_id: str, cratedb_sqlalchemy_url: str, table: str, format_: str): """ Import data into CrateDB and CrateDB Cloud clusters. + """ + + if not cluster_id and not cratedb_sqlalchemy_url: + raise KeyError( + "Either CrateDB Cloud Cluster identifier or CrateDB SQLAlchemy URL needs to be supplied. " + "Use --cluster-id / --cratedb-sqlalchemy-url CLI options " + "or CRATEDB_CLOUD_CLUSTER_ID / CRATEDB_SQLALCHEMY_URL environment variables." + ) + + if cluster_id: + load_table_cloud(cluster_id, url) + elif cratedb_sqlalchemy_url: + load_table_cratedb(cratedb_sqlalchemy_url, url) + else: + raise NotImplementedError("Importing resource not implemented yet") - # TODO: More inline documentation. - - https://console.cratedb.cloud + +def load_table_cloud(cluster_id: str, resource_url: str): """ + export CRATEDB_CLOUD_CLUSTER_ID=95998958-4d96-46eb-a77a-a894e7dde128 + ctk load table https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_weather.csv.gz + https://console.cratedb.cloud + """ cluster_info = get_cluster_info(cluster_id=cluster_id) cio = CloudIo(cluster_id=cluster_id) try: - job_info, success = cio.load_resource(url=url) + job_info, success = cio.load_resource(url=resource_url) print(json.dumps(job_info, indent=2), file=sys.stdout) # noqa: T201 # TODO: Explicitly report about `failed_records`, etc. texts = GuidingTexts( @@ -99,3 +121,18 @@ def load_table(ctx: click.Context, url: str, cluster_id: str, table: str, format except CroudException: logger.exception("Unknown error") sys.exit(1) + + +def load_table_cratedb(sqlalchemy_url: str, resource_url: str): + """ + export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo + ctk load table influxdb2://example:token@localhost:8086/testdrive/demo + """ + if resource_url.startswith("influxdb"): + from influxio.core import copy + + source_url = resource_url.replace("influxdb2", "http") + target_url = sqlalchemy_url + copy(source_url, target_url, progress=True) + else: + raise NotImplementedError("Importing resource not implemented yet") diff --git a/cratedb_toolkit/testing/testcontainers/cratedb.py b/cratedb_toolkit/testing/testcontainers/cratedb.py index 88f90a63..d78cdbb6 100644 --- a/cratedb_toolkit/testing/testcontainers/cratedb.py +++ b/cratedb_toolkit/testing/testcontainers/cratedb.py @@ -16,7 +16,7 @@ from testcontainers.core.config import MAX_TRIES from testcontainers.core.generic import DbContainer -from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer, asbool @@ -97,7 +97,9 @@ def get_connection_url(self, host=None) -> str: port=self.port_to_expose, ) + @wait_container_is_ready() def _connect(self): # TODO: Better use a network connectivity health check? # In `testcontainers-java`, there is the `HttpWaitStrategy`. + # TODO: Provide a client instance. wait_for_logs(self, predicate="o.e.n.Node.*started", timeout=MAX_TRIES) diff --git a/cratedb_toolkit/testing/testcontainers/influxdb2.py b/cratedb_toolkit/testing/testcontainers/influxdb2.py new file mode 100644 index 00000000..1f240a80 --- /dev/null +++ b/cratedb_toolkit/testing/testcontainers/influxdb2.py @@ -0,0 +1,80 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from influxdb_client import InfluxDBClient +from testcontainers.core.config import MAX_TRIES +from testcontainers.core.generic import DbContainer +from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs + +from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer + + +class InfluxDB2Container(KeepaliveContainer, DbContainer): + """ + InfluxDB database container. + + - https://en.wikipedia.org/wiki/Influxdb + + Example: + + The example spins up an InfluxDB2 database instance. + """ + + ORGANIZATION = "example" + TOKEN = "token" # noqa: S105 + + # TODO: Dual-port use with 8083+8086. + def __init__( + self, + # TODO: Use `influxdb:latest` by default? + image: str = "influxdb:2.7", + port: int = 8086, + dialect: str = "influxdb2", + **kwargs, + ) -> None: + super().__init__(image=image, **kwargs) + + self._name = "testcontainers-influxdb" # -{os.getpid()} + + self.port_to_expose = port + self.dialect = dialect + + self.with_exposed_ports(self.port_to_expose, 8083) + + self.debug = False + + def _configure(self) -> None: + self.with_env("DOCKER_INFLUXDB_INIT_MODE", "setup") + self.with_env("DOCKER_INFLUXDB_INIT_USERNAME", "admin") + self.with_env("DOCKER_INFLUXDB_INIT_PASSWORD", "secret1234") + self.with_env("DOCKER_INFLUXDB_INIT_ORG", self.ORGANIZATION) + self.with_env("DOCKER_INFLUXDB_INIT_BUCKET", "default") + self.with_env("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN", self.TOKEN) + + def get_connection_url(self, host=None) -> str: + return super()._create_connection_url( + dialect="http", + username=self.ORGANIZATION, + password=self.TOKEN, + host=host, + port=self.port_to_expose, + ) + + @wait_container_is_ready() + def _connect(self) -> InfluxDBClient: + # TODO: Better use a network connectivity health check? + # In `testcontainers-java`, there is the `HttpWaitStrategy`. + wait_for_logs(self, predicate="Listening.*tcp-listener.*8086", timeout=MAX_TRIES) + return InfluxDBClient(url=self.get_connection_url(), org=self.ORGANIZATION, token=self.TOKEN, debug=self.debug) + + def get_connection_client(self) -> InfluxDBClient: + return self._connect() diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index e5d435d4..643c33fc 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -92,6 +92,14 @@ def table_exists(self, tablename_full: str) -> bool: except Exception: return False + def refresh_table(self, tablename_full: str): + """ + Run a `REFRESH TABLE ...` command. + """ + sql = f"REFRESH TABLE {tablename_full};" # noqa: S608 + self.run_sql(sql=sql) + return True + def drop_repository(self, name: str): """ Drop snapshot repository. diff --git a/doc/backlog.md b/doc/backlog.md index c846369e..32fcbefc 100644 --- a/doc/backlog.md +++ b/doc/backlog.md @@ -16,6 +16,10 @@ ctk load blob /path/to/image.png ctk load object /local/path/to/image.png /dbfs/assets ``` +Catch recursion errors: +``` +CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/ +``` $ croud clusters import-jobs list --cluster-id=$CRATEDB_CLOUD_CLUSTER_ID --format=table Traceback (most recent call last): diff --git a/pyproject.toml b/pyproject.toml index 9ba32893..641a2c1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,9 @@ develop = [ "ruff==0.1.3", "validate-pyproject<0.16", ] +influxdb = [ + "influxio==0.1.1", +] io = [ "dask<=2023.10.1,>=2020", "pandas<3,>=1", diff --git a/tests/io/influxdb/__init__.py b/tests/io/influxdb/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/io/influxdb/conftest.py b/tests/io/influxdb/conftest.py new file mode 100644 index 00000000..01c2bb0b --- /dev/null +++ b/tests/io/influxdb/conftest.py @@ -0,0 +1,66 @@ +import logging + +import pytest +from influxdb_client import InfluxDBClient + +from cratedb_toolkit.testing.testcontainers.influxdb2 import InfluxDB2Container + +logger = logging.getLogger(__name__) + + +# Define buckets to be deleted before running each test case. +RESET_BUCKETS = [ + "testdrive", +] + + +class InfluxDB2Fixture: + """ + A little helper wrapping Testcontainer's `InfluxDB2Container`. + """ + + def __init__(self): + self.container = None + self.client: InfluxDBClient = None + self.setup() + + def setup(self): + # TODO: Make image name configurable. + self.container = InfluxDB2Container() + self.container.start() + self.client = self.container.get_connection_client() + + def finalize(self): + self.container.stop() + + def reset(self): + """ + Delete all buckets used for testing. + """ + for bucket_name in RESET_BUCKETS: + bucket = self.client.buckets_api().find_bucket_by_name(bucket_name) + if bucket is not None: + self.client.buckets_api().delete_bucket(bucket) + + def get_connection_url(self, *args, **kwargs): + return self.container.get_connection_url(*args, **kwargs) + + +@pytest.fixture(scope="session") +def influxdb_service(): + """ + Provide an InfluxDB service instance to the test suite. + """ + db = InfluxDB2Fixture() + db.reset() + yield db + db.finalize() + + +@pytest.fixture(scope="function") +def influxdb(influxdb_service): + """ + Provide a fresh canvas to each test case invocation, by resetting database content. + """ + influxdb_service.reset() + yield influxdb_service diff --git a/tests/io/influxdb/test_cli.py b/tests/io/influxdb/test_cli.py new file mode 100644 index 00000000..2acd3ae5 --- /dev/null +++ b/tests/io/influxdb/test_cli.py @@ -0,0 +1,35 @@ +from click.testing import CliRunner +from influxio.model import InfluxDbAdapter +from influxio.testdata import DataFrameFactory + +from cratedb_toolkit.cli import cli + + +def test_influxdb2_load_table(caplog, cratedb, influxdb): + """ + CLI test: Invoke `ctk load table`. + """ + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + influxdb_url = f"{influxdb.get_connection_url()}/testdrive/demo" + + # Populate source database with a few records worth of data. + adapter = InfluxDbAdapter.from_url(influxdb_url) + adapter.ensure_bucket() + dff = DataFrameFactory(rows=42) + df = dff.make("dateindex") + adapter.write_df(df) + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + influxdb_url = influxdb_url.replace("http://", "influxdb2://") + result = runner.invoke( + cli, + args=f"load table {influxdb_url}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify data in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 42