Skip to content

Commit 57351f3

Browse files
committed
Cluster API: Add dataset loader
1 parent 991c1ca commit 57351f3

File tree

3 files changed

+52
-13
lines changed

3 files changed

+52
-13
lines changed

cratedb_toolkit/cluster/core.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from cratedb_toolkit.cluster.guide import DataImportGuide
1616
from cratedb_toolkit.cluster.model import ClientBundle, ClusterBase, ClusterInformation
1717
from cratedb_toolkit.config import CONFIG
18+
from cratedb_toolkit.datasets import load_dataset
1819
from cratedb_toolkit.exception import (
1920
CroudException,
2021
DatabaseAddressMissingError,
@@ -338,6 +339,14 @@ def stop(self) -> "ManagedCluster":
338339
self.probe()
339340
return self
340341

342+
@flexfun(domain="runtime")
343+
def load_dataset(self, name: str, table: str = None):
344+
if table is None:
345+
table = name.replace("/", ".")
346+
with jwt_token_patch(self.info.jwt.token):
347+
ds = load_dataset(name)
348+
return ds.dbtable(dburi=self.address.dburi, table=table).load()
349+
341350
@flexfun(domain="runtime")
342351
def load_table(
343352
self,
@@ -619,6 +628,10 @@ def from_options(cls, options: ClusterAddressOptions) -> t.Union[ManagedCluster,
619628
"""
620629
return DatabaseCluster.create(**options.asdict())
621630

631+
@classmethod
632+
def from_ctx(cls, ctx: "click.Context") -> t.Union[ManagedCluster, StandaloneCluster]:
633+
return DatabaseCluster.from_options(ctx.meta["address"])
634+
622635
@classmethod
623636
def create(
624637
cls, cluster_id: str = None, cluster_name: str = None, cluster_url: str = None

cratedb_toolkit/datasets/model.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class DatasetToDatabaseTableAdapter:
7070
def __post_init__(self):
7171
self.init_sql = None
7272
self.db = DatabaseAdapter(dburi=self.dburi)
73+
self.table = self.db.quote_relation_name(self.table)
7374

7475
def create(
7576
self,
@@ -109,12 +110,14 @@ def load(
109110
has_data = cardinality > 0
110111

111112
if if_exists == "noop" and has_data:
112-
return
113+
return self
113114

114115
if self.init_sql is None:
115116
raise ValueError("SQL for loading data is missing")
116117
self.run_sql(self.init_sql)
117118

119+
return self
120+
118121
def run_sql(self, sql: str):
119122
for statement in sqlparse.parse(sql):
120123
self.db.run_sql(str(statement))

cratedb_toolkit/io/cli.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,18 @@
33
from pathlib import Path
44

55
import click
6-
from click_aliases import ClickAliasedGroup
76

87
from cratedb_toolkit.cluster.core import DatabaseCluster
98
from cratedb_toolkit.model import InputOutputResource, TableAddress
109
from cratedb_toolkit.option import option_cluster_id, option_cluster_name, option_cluster_url
11-
from cratedb_toolkit.util.cli import boot_click, make_command
10+
from cratedb_toolkit.util.app import make_cli
11+
from cratedb_toolkit.util.cli import make_command
1212

1313
logger = logging.getLogger(__name__)
1414

1515

16-
@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
17-
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
18-
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
19-
@click.version_option()
20-
@click.pass_context
21-
def cli(ctx: click.Context, verbose: bool, debug: bool):
22-
"""
23-
Load data into CrateDB.
24-
"""
25-
return boot_click(ctx, verbose, debug)
16+
cli = make_cli()
17+
cli.help = "Load data into CrateDB."
2618

2719

2820
@make_command(cli, name="table")
@@ -67,3 +59,34 @@ def load_table(
6759
cluster_url=cluster_url,
6860
)
6961
cluster.load_table(source=source, target=target, transformation=transformation)
62+
63+
logger.info(f"Importing table succeeded. source={source}, target={target}")
64+
65+
66+
@make_command(cli, name="dataset")
67+
@click.argument("name", envvar="DATASET_NAME", type=str)
68+
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
69+
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
70+
@click.pass_context
71+
def load_dataset(
72+
ctx: click.Context,
73+
name: str,
74+
schema: str,
75+
table: str,
76+
):
77+
"""
78+
Import named dataset into CrateDB and CrateDB Cloud clusters.
79+
"""
80+
81+
# Adjust/convert target table parameter.
82+
effective_table = None
83+
if table is not None:
84+
table_address = TableAddress(schema=schema, table=table)
85+
effective_table = table_address.fullname
86+
87+
# Dispatch "load dataset" operation.
88+
cluster = DatabaseCluster.from_ctx(ctx)
89+
ds = cluster.load_dataset(name=name, table=effective_table)
90+
91+
logger.info(f"Importing dataset succeeded. Name: {name}, Table: {ds.table}")
92+
logger.info(f"Peek SQL: SELECT * FROM {ds.table} LIMIT 42;")

0 commit comments

Comments
 (0)