Skip to content

Commit

Permalink
InfluxDB: Add adapter for influxio
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Nov 12, 2023
1 parent 8b31240 commit a86c5ac
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[io,test,develop]
pip install --use-pep517 --prefer-binary --editable=.[io,influxdb,test,develop]
- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

- Add SQL runner utility primitives to `io.sql` namespace
- Add `import_csv_pandas` and `import_csv_dask` utility primitives
- InfluxDB: Add adapter for `influxio`


## 2023/11/06 v0.0.2
Expand Down
40 changes: 40 additions & 0 deletions cratedb_toolkit/io/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

## Synopsis


## Cloud Import API

Using the [CrateDB Cloud] Import API, you can import files in CSV, JSON, and Parquet formats.

Define the cluster id of your CrateDB Cloud Cluster, and connection credentials.
```shell
export CRATEDB_CLOUD_CLUSTER_ID='e1e38d92-a650-48f1-8a70-8133f2d5c400'
Expand All @@ -19,3 +24,38 @@ Inquire data.
```shell
ctk shell --command="SELECT * FROM data_weather LIMIT 10;"
```


## InfluxDB

Using the adapter to [influxio], you can transfer data from InfluxDB to CrateDB.

Import two data points into InfluxDB.
```shell
export INFLUX_ORG=example
export INFLUX_TOKEN=token
export INFLUX_BUCKET_NAME=testdrive
export INFLUX_MEASUREMENT=demo
influx bucket create
influx write --precision=s "${INFLUX_MEASUREMENT},region=amazonas temperature=42.42,humidity=84.84 1556896326"
influx write --precision=s "${INFLUX_MEASUREMENT},region=amazonas temperature=45.89,humidity=77.23,windspeed=5.4 1556896327"
influx query "from(bucket:\"${INFLUX_BUCKET_NAME}\") |> range(start:-100y)"
```

Transfer data.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
crash --command "SELECT * FROM testdrive.demo;"
```

Todo: More convenient table querying.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```


[CrateDB Cloud]: https://console.cratedb.cloud/
[influxio]: https://github.com/daq-tools/influxio
47 changes: 42 additions & 5 deletions cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,46 @@ def error(self):
@make_command(cli, name="table")
@click.argument("url")
@click.option(
"--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=True, help="CrateDB Cloud cluster identifier"
"--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier"
)
@click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)
@click.option("--table", type=str, required=False, help="Table name where to import the io")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.pass_context
def load_table(ctx: click.Context, url: str, cluster_id: str, table: str, format_: str):
def load_table(ctx: click.Context, url: str, cluster_id: str, cratedb_sqlalchemy_url: str, table: str, format_: str):
"""
Import data into CrateDB and CrateDB Cloud clusters.
"""

if not cluster_id and not cratedb_sqlalchemy_url:
raise KeyError(

Check warning on line 82 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L82

Added line #L82 was not covered by tests
"Either CrateDB Cloud Cluster identifier or CrateDB SQLAlchemy URL needs to be supplied. "
"Use --cluster-id / --cratedb-sqlalchemy-url CLI options "
"or CRATEDB_CLOUD_CLUSTER_ID / CRATEDB_SQLALCHEMY_URL environment variables."
)

if cluster_id:
load_table_cloud(cluster_id, url)

Check warning on line 89 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L89

Added line #L89 was not covered by tests
elif cratedb_sqlalchemy_url:
load_table_cratedb(cratedb_sqlalchemy_url, url)
else:
raise NotImplementedError("Importing resource not implemented yet")

Check warning on line 93 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L93

Added line #L93 was not covered by tests

# TODO: More inline documentation.
- https://console.cratedb.cloud

def load_table_cloud(cluster_id: str, resource_url: str):
"""
export CRATEDB_CLOUD_CLUSTER_ID=95998958-4d96-46eb-a77a-a894e7dde128
ctk load table https://github.com/crate/cratedb-datasets/raw/main/cloud-tutorials/data_weather.csv.gz
https://console.cratedb.cloud
"""
cluster_info = get_cluster_info(cluster_id=cluster_id)
cio = CloudIo(cluster_id=cluster_id)

try:
job_info, success = cio.load_resource(url=url)
job_info, success = cio.load_resource(url=resource_url)

Check warning on line 107 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L107

Added line #L107 was not covered by tests
print(json.dumps(job_info, indent=2), file=sys.stdout) # noqa: T201
# TODO: Explicitly report about `failed_records`, etc.
texts = GuidingTexts(
Expand All @@ -99,3 +121,18 @@ def load_table(ctx: click.Context, url: str, cluster_id: str, table: str, format
except CroudException:
logger.exception("Unknown error")
sys.exit(1)


def load_table_cratedb(sqlalchemy_url: str, resource_url: str):
"""
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
"""
if resource_url.startswith("influxdb"):
from influxio.core import copy

source_url = resource_url.replace("influxdb2", "http")
target_url = sqlalchemy_url
copy(source_url, target_url, progress=True)
else:
raise NotImplementedError("Importing resource not implemented yet")

Check warning on line 138 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L138

Added line #L138 was not covered by tests
4 changes: 3 additions & 1 deletion cratedb_toolkit/testing/testcontainers/cratedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from testcontainers.core.config import MAX_TRIES
from testcontainers.core.generic import DbContainer
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs

from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer, asbool

Expand Down Expand Up @@ -97,7 +97,9 @@ def get_connection_url(self, host=None) -> str:
port=self.port_to_expose,
)

@wait_container_is_ready()
def _connect(self):
# TODO: Better use a network connectivity health check?
# In `testcontainers-java`, there is the `HttpWaitStrategy`.
# TODO: Provide a client instance.
wait_for_logs(self, predicate="o.e.n.Node.*started", timeout=MAX_TRIES)
80 changes: 80 additions & 0 deletions cratedb_toolkit/testing/testcontainers/influxdb2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from influxdb_client import InfluxDBClient
from testcontainers.core.config import MAX_TRIES
from testcontainers.core.generic import DbContainer
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs

from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer


class InfluxDB2Container(KeepaliveContainer, DbContainer):
"""
InfluxDB database container.
- https://en.wikipedia.org/wiki/Influxdb
Example:
The example spins up an InfluxDB2 database instance.
"""

ORGANIZATION = "example"
TOKEN = "token" # noqa: S105

# TODO: Dual-port use with 8083+8086.
def __init__(
self,
# TODO: Use `influxdb:latest` by default?
image: str = "influxdb:2.7",
port: int = 8086,
dialect: str = "influxdb2",
**kwargs,
) -> None:
super().__init__(image=image, **kwargs)

self._name = "testcontainers-influxdb" # -{os.getpid()}

self.port_to_expose = port
self.dialect = dialect

self.with_exposed_ports(self.port_to_expose, 8083)

self.debug = False

def _configure(self) -> None:
self.with_env("DOCKER_INFLUXDB_INIT_MODE", "setup")
self.with_env("DOCKER_INFLUXDB_INIT_USERNAME", "admin")
self.with_env("DOCKER_INFLUXDB_INIT_PASSWORD", "secret1234")
self.with_env("DOCKER_INFLUXDB_INIT_ORG", self.ORGANIZATION)
self.with_env("DOCKER_INFLUXDB_INIT_BUCKET", "default")
self.with_env("DOCKER_INFLUXDB_INIT_ADMIN_TOKEN", self.TOKEN)

def get_connection_url(self, host=None) -> str:
return super()._create_connection_url(
dialect="http",
username=self.ORGANIZATION,
password=self.TOKEN,
host=host,
port=self.port_to_expose,
)

@wait_container_is_ready()
def _connect(self) -> InfluxDBClient:
# TODO: Better use a network connectivity health check?
# In `testcontainers-java`, there is the `HttpWaitStrategy`.
wait_for_logs(self, predicate="Listening.*tcp-listener.*8086", timeout=MAX_TRIES)
return InfluxDBClient(url=self.get_connection_url(), org=self.ORGANIZATION, token=self.TOKEN, debug=self.debug)

def get_connection_client(self) -> InfluxDBClient:
return self._connect()
8 changes: 8 additions & 0 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ def table_exists(self, tablename_full: str) -> bool:
except Exception:
return False

def refresh_table(self, tablename_full: str):
"""
Run a `REFRESH TABLE ...` command.
"""
sql = f"REFRESH TABLE {tablename_full};" # noqa: S608
self.run_sql(sql=sql)
return True

def drop_repository(self, name: str):
"""
Drop snapshot repository.
Expand Down
4 changes: 4 additions & 0 deletions doc/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ ctk load blob /path/to/image.png
ctk load object /local/path/to/image.png /dbfs/assets
```

Catch recursion errors:
```
CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/
```

$ croud clusters import-jobs list --cluster-id=$CRATEDB_CLOUD_CLUSTER_ID --format=table
Traceback (most recent call last):
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ develop = [
"ruff==0.1.3",
"validate-pyproject<0.16",
]
influxdb = [
"influxio==0.1.1",
]
io = [
"dask<=2023.10.1,>=2020",
"pandas<3,>=1",
Expand Down
Empty file added tests/io/influxdb/__init__.py
Empty file.
66 changes: 66 additions & 0 deletions tests/io/influxdb/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging

import pytest
from influxdb_client import InfluxDBClient

from cratedb_toolkit.testing.testcontainers.influxdb2 import InfluxDB2Container

logger = logging.getLogger(__name__)


# Define buckets to be deleted before running each test case.
RESET_BUCKETS = [
"testdrive",
]


class InfluxDB2Fixture:
"""
A little helper wrapping Testcontainer's `InfluxDB2Container`.
"""

def __init__(self):
self.container = None
self.client: InfluxDBClient = None
self.setup()

def setup(self):
# TODO: Make image name configurable.
self.container = InfluxDB2Container()
self.container.start()
self.client = self.container.get_connection_client()

def finalize(self):
self.container.stop()

def reset(self):
"""
Delete all buckets used for testing.
"""
for bucket_name in RESET_BUCKETS:
bucket = self.client.buckets_api().find_bucket_by_name(bucket_name)
if bucket is not None:
self.client.buckets_api().delete_bucket(bucket)

def get_connection_url(self, *args, **kwargs):
return self.container.get_connection_url(*args, **kwargs)


@pytest.fixture(scope="session")
def influxdb_service():
"""
Provide an InfluxDB service instance to the test suite.
"""
db = InfluxDB2Fixture()
db.reset()
yield db
db.finalize()


@pytest.fixture(scope="function")
def influxdb(influxdb_service):
"""
Provide a fresh canvas to each test case invocation, by resetting database content.
"""
influxdb_service.reset()
yield influxdb_service
35 changes: 35 additions & 0 deletions tests/io/influxdb/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from click.testing import CliRunner
from influxio.model import InfluxDbAdapter
from influxio.testdata import DataFrameFactory

from cratedb_toolkit.cli import cli


def test_influxdb2_load_table(caplog, cratedb, influxdb):
"""
CLI test: Invoke `ctk load table`.
"""
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
influxdb_url = f"{influxdb.get_connection_url()}/testdrive/demo"

# Populate source database with a few records worth of data.
adapter = InfluxDbAdapter.from_url(influxdb_url)
adapter.ensure_bucket()
dff = DataFrameFactory(rows=42)
df = dff.make("dateindex")
adapter.write_df(df)

# Run transfer command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url})
influxdb_url = influxdb_url.replace("http://", "influxdb2://")
result = runner.invoke(
cli,
args=f"load table {influxdb_url}",
catch_exceptions=False,
)
assert result.exit_code == 0

# Verify data in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 42

0 comments on commit a86c5ac

Please sign in to comment.