From a52a54fef433844d642a7bba5a273cd35fc25afc Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 17:45:01 +0200 Subject: [PATCH 01/12] MongoDB: Fix missing output on STDOUT for `migr8 export` --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/cli.py | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3606a814..29220275 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Processor: Updated Kinesis Lambda processor to understand AWS DMS +- MongoDB: Fix missing output on STDOUT for `migr8 export` ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index a52d3949..bfc9a482 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -1,5 +1,6 @@ import argparse import json +import sys import rich @@ -78,11 +79,14 @@ def translate_from_file(args): def export_to_stdout(args): - export(args) + sys.stdout.buffer.write(export(args).read()) def main(): - rich.print("\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n") + rich.print( + "\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n", + file=sys.stderr, + ) args = get_args() if args.command == "extract": extract_to_file(args) From 18f51466d27ced9663aff3261df649932a1e2eb5 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 17:47:41 +0200 Subject: [PATCH 02/12] MongoDB: Improve timestamp parsing by using `python-dateutil` --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/export.py | 17 ++--------------- pyproject.toml | 1 + 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 29220275..70c0ed45 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ ## Unreleased - Processor: Updated Kinesis Lambda processor to understand AWS DMS - MongoDB: Fix missing output on STDOUT for `migr8 export` +- MongoDB: Improve timestamp parsing by using `python-dateutil` ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index b25bb0e1..a9f108fc 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -25,32 +25,19 @@ """ import calendar -import re import sys import typing as t -from datetime import datetime, timedelta import bsonjs +import dateutil.parser as dateparser import orjson as json import pymongo.collection -_TZINFO_RE = re.compile(r"([+\-])?(\d\d):?(\d\d)") - def date_converter(value): if isinstance(value, int): return value - dt = datetime.strptime(value[:-5], "%Y-%m-%dT%H:%M:%S.%f") - iso_match = _TZINFO_RE.match(value[-5:]) - if iso_match: - sign, hours, minutes = iso_match.groups() - tzoffset = int(hours) * 3600 + int(minutes) * 60 - if sign == "-": - dt = dt + timedelta(seconds=tzoffset) - else: - dt = dt - timedelta(seconds=tzoffset) - else: - raise Exception("Can't parse datetime string {0}".format(value)) + dt = dateparser.parse(value) return calendar.timegm(dt.utctimetuple()) * 1000 diff --git a/pyproject.toml b/pyproject.toml index 84125976..081130e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,6 +93,7 @@ dependencies = [ 'importlib-metadata; python_version < "3.8"', 'importlib-resources; python_version < "3.9"', "polars<1.5", + "python-dateutil<3", "python-dotenv<2", "python-slugify<9", "pyyaml<7", From 330ef9e066a04c195ce686ce950d4f9af6f4d53e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 18:17:27 +0200 Subject: [PATCH 03/12] MongoDB: Converge `_id` input field to `id` column instead of dropping --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/export.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 70c0ed45..24ebaf74 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ - Processor: Updated Kinesis Lambda processor to understand AWS DMS - MongoDB: Fix missing output on STDOUT for `migr8 export` - MongoDB: Improve timestamp parsing by using `python-dateutil` +- MongoDB: Converge `_id` input field to `id` column instead of dropping it ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index a9f108fc..9ea97f94 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -72,7 +72,10 @@ def extract_value(value, parent_type=None): def convert(d): newdict = {} - del d["_id"] + # TODO: More columns can start with underscore `_`. + if "_id" in d: + d["id"] = d["_id"] + del d["_id"] for k, v in d.items(): newdict[k] = extract_value(v) return newdict From a0b576924952c4225098805d2f95c6da268d0b3b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 19:37:36 +0200 Subject: [PATCH 04/12] MongoDB: Make user interface use stderr, so stdout is for data only --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/cli.py | 12 ++++++------ cratedb_toolkit/io/mongodb/core.py | 15 +++++++++++---- cratedb_toolkit/io/mongodb/extract.py | 5 +++++ 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 24ebaf74..21d7ecc7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ - MongoDB: Fix missing output on STDOUT for `migr8 export` - MongoDB: Improve timestamp parsing by using `python-dateutil` - MongoDB: Converge `_id` input field to `id` column instead of dropping it +- MongoDB: Make user interface use stderr, so stdout is for data only ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index bfc9a482..3d8fcebb 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -2,11 +2,14 @@ import json import sys -import rich +from rich.console import Console from cratedb_toolkit import __version__ from cratedb_toolkit.io.mongodb.core import export, extract, translate +console = Console(stderr=True) +rich = console + def extract_parser(subargs): parser = subargs.add_parser("extract", help="Extract a schema from a MongoDB database") @@ -62,7 +65,7 @@ def extract_to_file(args): """ schema = extract(args) - rich.print(f"\nWriting resulting schema to {args.out}...") + rich.print(f"\nWriting resulting schema to {args.out}") with open(args.out, "w") as out: json.dump(schema, out, indent=4) rich.print("[green bold]Done![/green bold]") @@ -83,10 +86,7 @@ def export_to_stdout(args): def main(): - rich.print( - "\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n", - file=sys.stderr, - ) + rich.print("\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n") args = get_args() if args.command == "extract": extract_to_file(args) diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index d7ce8cd0..c3269951 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -1,12 +1,14 @@ import io import logging +import sys import typing as t import pymongo import pymongo.database -import rich from bson.raw_bson import RawBSONDocument +from rich.console import Console from rich.syntax import Syntax +from rich.table import Table from .export import collection_to_json from .extract import extract_schema_from_collection @@ -15,6 +17,9 @@ logger = logging.getLogger(__name__) +console = Console(stderr=True) +rich = console + def gather_collections(database) -> t.List[str]: """ @@ -23,7 +28,7 @@ def gather_collections(database) -> t.List[str]: collections = database.list_collection_names() - tbl = rich.table.Table(show_header=True, header_style="bold blue") + tbl = Table(show_header=True, header_style="bold blue") tbl.add_column("Id", width=3) tbl.add_column("Collection Name") tbl.add_column("Estimated Size") @@ -35,7 +40,8 @@ def gather_collections(database) -> t.List[str]: rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2')") - collections_to_ignore = parse_input_numbers(input("> ")) + sys.stderr.write("> ") + collections_to_ignore = parse_input_numbers(input()) filtered_collections = [] for i, c in enumerate(collections): if i not in collections_to_ignore: @@ -80,7 +86,8 @@ def extract(args) -> t.Dict[str, t.Any]: else: rich.print("\nDo a [red bold]full[/red bold] collection scan?") rich.print("A full scan will iterate over all documents in the collection, a partial only one document. (Y/n)") - full = input("> ").strip().lower() + sys.stderr.write("> ") + full = input().strip().lower() partial = full != "y" diff --git a/cratedb_toolkit/io/mongodb/extract.py b/cratedb_toolkit/io/mongodb/extract.py index 71791a9d..b6cfd522 100644 --- a/cratedb_toolkit/io/mongodb/extract.py +++ b/cratedb_toolkit/io/mongodb/extract.py @@ -69,6 +69,10 @@ import bson from pymongo.collection import Collection from rich import progress +from rich.console import Console + +console = Console(stderr=True) +rich = console progressbar = progress.Progress( progress.TextColumn("{task.description} ", justify="left"), @@ -76,6 +80,7 @@ "[progress.percentage]{task.percentage:>3.1f}% ({task.completed}/{task.total})", "•", progress.TimeRemainingColumn(), + console=console, ) From 834772afc0556f3751a85db601024f36d3ab6c8e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 19:47:19 +0200 Subject: [PATCH 05/12] MongoDB: Make `migr8 extract` write to stdout by default --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/cli.py | 17 ++++++++++++----- cratedb_toolkit/io/mongodb/core.py | 2 +- doc/io/mongodb/migr8.md | 2 +- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 21d7ecc7..eb44299e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ - MongoDB: Improve timestamp parsing by using `python-dateutil` - MongoDB: Converge `_id` input field to `id` column instead of dropping it - MongoDB: Make user interface use stderr, so stdout is for data only +- MongoDB: Make `migr8 extract` write to stdout by default ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index 3d8fcebb..4f6ed031 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -1,6 +1,7 @@ import argparse import json import sys +import typing as t from rich.console import Console @@ -23,7 +24,7 @@ def extract_parser(subargs): choices=["full", "partial"], help="Whether to fully scan the MongoDB collections or only partially.", ) - parser.add_argument("-o", "--out", default="mongodb_schema.json") + parser.add_argument("-o", "--out", required=False) def translate_parser(subargs): @@ -65,10 +66,16 @@ def extract_to_file(args): """ schema = extract(args) - rich.print(f"\nWriting resulting schema to {args.out}") - with open(args.out, "w") as out: - json.dump(schema, out, indent=4) - rich.print("[green bold]Done![/green bold]") + + out_label = args.out or "stdout" + rich.print(f"\nWriting resulting schema to {out_label}") + fp: t.TextIO + if args.out: + fp = open(args.out, "w") + else: + fp = sys.stdout + json.dump(schema, fp=fp, indent=4) + fp.flush() def translate_from_file(args): diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index c3269951..56c1ee2b 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -38,7 +38,7 @@ def gather_collections(database) -> t.List[str]: rich.print(tbl) - rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2')") + rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty for using all connections.") sys.stderr.write("> ") collections_to_ignore = parse_input_numbers(input()) diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index 48ab352d..c55020e6 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -80,7 +80,7 @@ migr8 --help To extract a description of the schema of a collection, use the `extract` subcommand. For example: - migr8 extract --host localhost --port 27017 --database test_db + migr8 extract --host localhost --port 27017 --database test_db > mongodb_schema.json After connecting to the designated MongoDB server, it will look at the collections within that database, and will prompt you which From 5ae0980cf3c36254f38ffdc2a3b92138faefe4c9 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 21:28:46 +0200 Subject: [PATCH 06/12] MongoDB: Make `migr8 translate` read from stdin by default --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/cli.py | 11 +++++++---- doc/io/mongodb/migr8.md | 9 +++++++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index eb44299e..9ebba9ff 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,7 @@ - MongoDB: Converge `_id` input field to `id` column instead of dropping it - MongoDB: Make user interface use stderr, so stdout is for data only - MongoDB: Make `migr8 extract` write to stdout by default +- MongoDB: Make `migr8 translate` read from stdin by default ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index 4f6ed031..4fbb78a7 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -82,10 +82,13 @@ def translate_from_file(args): """ Read in a JSON file and extract the schema from it. """ - - with open(args.infile) as f: - schema = json.load(f) - translate(schema) + fp: t.TextIO + if args.infile: + fp = open(args.infile, "r") + else: + fp = sys.stdin + schema = json.load(fp) + translate(schema) def export_to_stdout(args): diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index c55020e6..e6e8d242 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -80,7 +80,7 @@ migr8 --help To extract a description of the schema of a collection, use the `extract` subcommand. For example: - migr8 extract --host localhost --port 27017 --database test_db > mongodb_schema.json + migr8 extract --host localhost --port 27017 --database test_db --out mongodb_schema.json After connecting to the designated MongoDB server, it will look at the collections within that database, and will prompt you which @@ -174,7 +174,7 @@ mostly consistent data-types. Once a schema description has been extracted, it can be translated into a CrateDB schema definition using the `translate` subcommand: - migr8 translate -i mongodb_schema.json + migr8 translate --infile mongodb_schema.json This will attempt to translate the description into a best-fit CrateDB table definition. Where datatypes are ambiguous, it will *choose the @@ -193,6 +193,11 @@ CREATE TABLE IF NOT EXISTS "doc"."test" ( ); ``` +You can also connect both programs to each other, to execute both steps at once. +```shell +migr8 extract ... | migr8 translate +``` + ### MongoDB Collection Export From f96b1c6b0fa5b06e80d5161055d6ddf4396ea559 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 21:30:15 +0200 Subject: [PATCH 07/12] MongoDB: Improve user interface messages --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/cli.py | 7 +++++-- cratedb_toolkit/io/mongodb/core.py | 3 +-- cratedb_toolkit/io/mongodb/extract.py | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9ebba9ff..9293ace9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ - MongoDB: Make user interface use stderr, so stdout is for data only - MongoDB: Make `migr8 extract` write to stdout by default - MongoDB: Make `migr8 translate` read from stdin by default +- MongoDB: Improve user interface messages ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index 4fbb78a7..fd3ba011 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -68,7 +68,7 @@ def extract_to_file(args): schema = extract(args) out_label = args.out or "stdout" - rich.print(f"\nWriting resulting schema to {out_label}") + rich.print(f"Writing resulting schema to {out_label}") fp: t.TextIO if args.out: fp = open(args.out, "w") @@ -96,11 +96,14 @@ def export_to_stdout(args): def main(): - rich.print("\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n") args = get_args() + headline_prefix = "[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter" if args.command == "extract": + rich.print(f"{headline_prefix} -> Schema Extractor") extract_to_file(args) elif args.command == "translate": + rich.print(f"{headline_prefix} -> Schema Translator") translate_from_file(args) elif args.command == "export": + rich.print(f"{headline_prefix} -> Data Exporter") export_to_stdout(args) diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index 56c1ee2b..1e7561b1 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -91,7 +91,7 @@ def extract(args) -> t.Dict[str, t.Any]: partial = full != "y" - rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan...") + rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan") schemas = {} for collection in filtered_collections: @@ -110,7 +110,6 @@ def translate(schemas, schemaname: str = None) -> t.Dict[str, str]: syntax = Syntax(query, "sql") rich.print(f"Collection [blue bold]'{collection}'[/blue bold]:") rich.print(syntax) - rich.print() return result diff --git a/cratedb_toolkit/io/mongodb/extract.py b/cratedb_toolkit/io/mongodb/extract.py index b6cfd522..aa8a258c 100644 --- a/cratedb_toolkit/io/mongodb/extract.py +++ b/cratedb_toolkit/io/mongodb/extract.py @@ -75,7 +75,7 @@ rich = console progressbar = progress.Progress( - progress.TextColumn("{task.description} ", justify="left"), + progress.TextColumn("Processing collection: {task.description} ", justify="left"), progress.BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.1f}% ({task.completed}/{task.total})", "•", From d306039a2f728a178b60577498b04f10e1836078 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 21:32:36 +0200 Subject: [PATCH 08/12] MongoDB: Strip single leading underscore character from top-level fields --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/export.py | 8 +- cratedb_toolkit/io/mongodb/translate.py | 13 +- cratedb_toolkit/io/mongodb/util.py | 16 +++ cratedb_toolkit/util/data_dict.py | 165 ++++++++++++++++++++++++ 5 files changed, 193 insertions(+), 10 deletions(-) create mode 100644 cratedb_toolkit/util/data_dict.py diff --git a/CHANGES.md b/CHANGES.md index 9293ace9..0080b662 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ - MongoDB: Make `migr8 extract` write to stdout by default - MongoDB: Make `migr8 translate` read from stdin by default - MongoDB: Improve user interface messages +- MongoDB: Strip single leading underscore character from all top-level fields ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 9ea97f94..23eb45d8 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -33,6 +33,8 @@ import orjson as json import pymongo.collection +from cratedb_toolkit.io.mongodb.util import sanitize_field_names + def date_converter(value): if isinstance(value, int): @@ -72,11 +74,7 @@ def extract_value(value, parent_type=None): def convert(d): newdict = {} - # TODO: More columns can start with underscore `_`. - if "_id" in d: - d["id"] = d["_id"] - del d["_id"] - for k, v in d.items(): + for k, v in sanitize_field_names(d).items(): newdict[k] = extract_value(v) return newdict diff --git a/cratedb_toolkit/io/mongodb/translate.py b/cratedb_toolkit/io/mongodb/translate.py index 71235dea..4b5243ca 100644 --- a/cratedb_toolkit/io/mongodb/translate.py +++ b/cratedb_toolkit/io/mongodb/translate.py @@ -33,6 +33,8 @@ from functools import reduce +from cratedb_toolkit.io.mongodb.util import sanitize_field_names + TYPES = { "DATETIME": "TIMESTAMP WITH TIME ZONE", "INT64": "INTEGER", @@ -95,7 +97,7 @@ def translate_array(schema): def determine_type(schema): """ - Determine the type of a specific field schema. + Determine the type of specific field schema. """ types = schema.get("types", {}) @@ -108,9 +110,9 @@ def determine_type(schema): sql_type = translate_array(types["ARRAY"]) if len(types) > 1: - return (sql_type, proportion_string(types)) - return (sql_type, None) - return ("UNKNOWN", None) + return sql_type, proportion_string(types) + return sql_type, None + return "UNKNOWN", None def proportion_string(types: dict) -> str: @@ -157,7 +159,8 @@ def translate(schemas, schemaname: str = None): for tablename in tables: collection = schemas[tablename] columns = [] - for fieldname, field in collection["document"].items(): + fields = sanitize_field_names(collection["document"]) + for fieldname, field in fields.items(): sql_type, comment = determine_type(field) if sql_type != "UNKNOWN": columns.append((COLUMN.format(column_name=fieldname, type=sql_type), comment)) diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 26076c1c..81e63a53 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,4 +1,7 @@ import re +import typing as t + +from cratedb_toolkit.util.data_dict import OrderedDictX def parse_input_numbers(s: str): @@ -21,3 +24,16 @@ def parse_input_numbers(s: str): except ValueError: pass return options + + +def sanitize_field_names(data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + """ + CrateDB does not accept leading underscores as top-level column names, like `_foo`. + + Utility function to rename all relevant column names, keeping their order intact. + """ + d = OrderedDictX(data) + for name in d.keys(): + if name.startswith("_") and name[1] != "_": + d.rename_key(name, name[1:]) + return d diff --git a/cratedb_toolkit/util/data_dict.py b/cratedb_toolkit/util/data_dict.py new file mode 100644 index 00000000..0459624a --- /dev/null +++ b/cratedb_toolkit/util/data_dict.py @@ -0,0 +1,165 @@ +""" +OrderedDictX by Zuzu Corneliu. + +For the keeping of order case (the other one is trivial, remove old and add new +one): I was not satisfied with the ordered-dictionary needing reconstruction +(at least partially), obviously for efficiency reasons, so I've put together a +class (OrderedDictX) that extends OrderedDict and allows you to do key changes +efficiently, i.e. in O(1) complexity. The implementation can also be adjusted +for the now-ordered built-in dict class. + +It uses 2 extra dictionaries to remap the changed keys ("external" - i.e. as +they appear externally to the user) to the ones in the underlying OrderedDict +("internal") - the dictionaries will only hold keys that were changed so as +long as no key changing is done they will be empty. + +As expected, the splicing method is extremely slow (didn't expect it to be that +much slower either though) and uses a lot of memory, and the O(N) solution of +@Ashwini Chaudhary (bug-fixed though, del also needed) is also slower, 17X +times in this example. + +Of course, this solution being O(1), compared to the O(N) OrderedDictRaymond +the time difference becomes much more apparent as the dictionary size +increases, e.g. for 5 times more elements (100000), the O(N) is 100X slower. + +https://stackoverflow.com/questions/16475384/rename-a-dictionary-key/75115645#75115645 +""" + +from collections import OrderedDict + + +class OrderedDictX(OrderedDict): + def __init__(self, *args, **kwargs): + # Mappings from new->old (ext2int), old->new (int2ext). + # Only the keys that are changed (internal key doesn't match what the user sees) are contained. + self._keys_ext2int = OrderedDict() + self._keys_int2ext = OrderedDict() + self.update(*args, **kwargs) + + def rename_key(self, k_old, k_new): + # Validate that the old key is part of the dict + if not self.__contains__(k_old): + raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_old} not existing in dict") + + # Return if no changing is actually to be done + if len(OrderedDict.fromkeys([k_old, k_new])) == 1: + return + + # Validate that the new key would not conflict with another one + if self.__contains__(k_new): + raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_new} already in dict") + + # Change the key using internal dicts mechanism + if k_old in self._keys_ext2int: + # Revert change temporarily + k_old_int = self._keys_ext2int[k_old] + del self._keys_ext2int[k_old] + k_old = k_old_int + # Check if new key matches the internal key + if len(OrderedDict.fromkeys([k_old, k_new])) == 1: + del self._keys_int2ext[k_old] + return + + # Finalize key change + self._keys_ext2int[k_new] = k_old + self._keys_int2ext[k_old] = k_new + + def __contains__(self, k) -> bool: + if k in self._keys_ext2int: + return True + if not super().__contains__(k): + return False + return k not in self._keys_int2ext + + def __getitem__(self, k): + if not self.__contains__(k): + # Intentionally raise KeyError in ext2int + return self._keys_ext2int[k] + return super().__getitem__(self._keys_ext2int.get(k, k)) + + def __setitem__(self, k, v): + if k in self._keys_ext2int: + return super().__setitem__(self._keys_ext2int[k], v) + # If the key exists in the internal state but was renamed to a k_ext, + # employ this trick: make it such that it appears as if k_ext has also been renamed to k + if k in self._keys_int2ext: + k_ext = self._keys_int2ext[k] + self._keys_ext2int[k] = k_ext + k = k_ext + return super().__setitem__(k, v) + + def __delitem__(self, k): + if not self.__contains__(k): + # Intentionally raise KeyError in ext2int + del self._keys_ext2int[k] + if k in self._keys_ext2int: + k_int = self._keys_ext2int[k] + del self._keys_ext2int[k] + del self._keys_int2ext[k_int] + k = k_int + return super().__delitem__(k) + + def __iter__(self): + yield from self.keys() + + def __reversed__(self): + for k in reversed(super().keys()): + yield self._keys_int2ext.get(k, k) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, dict): + return False + if len(self) != len(other): + return False + for (k, v), (k_other, v_other) in zip(self.items(), other.items()): + if k != k_other or v != v_other: + return False + return True + + def update(self, *args, **kwargs): + for k, v in OrderedDict(*args, **kwargs).items(): + self.__setitem__(k, v) + + def popitem(self, last=True) -> tuple: + if not last: + k = next(iter(self.keys())) + else: + k = next(iter(reversed(self.keys()))) + v = self.__getitem__(k) + self.__delitem__(k) + return k, v + + class OrderedDictXKeysView: + def __init__(self, odx: "OrderedDictX", orig_keys): + self._odx = odx + self._orig_keys = orig_keys + + def __iter__(self): + for k in self._orig_keys: + yield self._odx._keys_int2ext.get(k, k) + + def __reversed__(self): + for k in reversed(self._orig_keys): + yield self._odx._keys_int2ext.get(k, k) + + class OrderedDictXItemsView: + def __init__(self, odx: "OrderedDictX", orig_items): + self._odx = odx + self._orig_items = orig_items + + def __iter__(self): + for k, v in self._orig_items: + yield self._odx._keys_int2ext.get(k, k), v + + def __reversed__(self): + for k, v in reversed(self._orig_items): + yield self._odx._keys_int2ext.get(k, k), v + + def keys(self): + return self.OrderedDictXKeysView(self, super().keys()) + + def items(self): + return self.OrderedDictXItemsView(self, super().items()) + + def copy(self): + return OrderedDictX(self.items()) From e2a5f05e51649e1b46d58721fdf76dab4c8f6c56 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 21:33:40 +0200 Subject: [PATCH 09/12] MongoDB: Map OID types to CrateDB TEXT columns --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/translate.py | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 0080b662..318fa3bd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ - MongoDB: Make `migr8 translate` read from stdin by default - MongoDB: Improve user interface messages - MongoDB: Strip single leading underscore character from all top-level fields +- MongoDB: Map OID types to CrateDB TEXT columns ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/translate.py b/cratedb_toolkit/io/mongodb/translate.py index 4b5243ca..2bf32364 100644 --- a/cratedb_toolkit/io/mongodb/translate.py +++ b/cratedb_toolkit/io/mongodb/translate.py @@ -36,6 +36,7 @@ from cratedb_toolkit.io.mongodb.util import sanitize_field_names TYPES = { + "OID": "TEXT", "DATETIME": "TIMESTAMP WITH TIME ZONE", "INT64": "INTEGER", "STRING": "TEXT", From ce4e1f658f5d5fad6eaead6f03f6143512aff902 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Aug 2024 21:49:51 +0200 Subject: [PATCH 10/12] MongoDB: Make `migr8 extract` and `migr8 export` accept `--limit` option --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/api.py | 8 +++++--- cratedb_toolkit/io/mongodb/cli.py | 2 ++ cratedb_toolkit/io/mongodb/core.py | 4 ++-- cratedb_toolkit/io/mongodb/export.py | 21 ++++++++++++++------- cratedb_toolkit/io/mongodb/extract.py | 4 ++-- doc/io/mongodb/migr8.md | 7 ++++++- 7 files changed, 32 insertions(+), 15 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 318fa3bd..cfe61d6e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,7 @@ - MongoDB: Improve user interface messages - MongoDB: Strip single leading underscore character from all top-level fields - MongoDB: Map OID types to CrateDB TEXT columns +- MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index a8f73bfb..cd22ad45 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -10,7 +10,7 @@ logger = logging.getLogger(__name__) -def mongodb_copy(source_url, target_url, progress: bool = False): +def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False): """ Synopsis -------- @@ -36,7 +36,7 @@ def mongodb_copy(source_url, target_url, progress: bool = False): # 1. Extract schema from MongoDB collection. logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}") extract_args = argparse.Namespace( - url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full" + url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full", limit=limit ) mongodb_schema = extract(extract_args) count = mongodb_schema[mongodb_collection]["count"] @@ -64,7 +64,9 @@ def mongodb_copy(source_url, target_url, progress: bool = False): f"Transferring data from MongoDB to CrateDB: " f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}" ) - export_args = argparse.Namespace(url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection) + export_args = argparse.Namespace( + url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit + ) buffer = export(export_args) cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index fd3ba011..563af816 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -24,6 +24,7 @@ def extract_parser(subargs): choices=["full", "partial"], help="Whether to fully scan the MongoDB collections or only partially.", ) + parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") parser.add_argument("-o", "--out", required=False) @@ -42,6 +43,7 @@ def export_parser(subargs): parser.add_argument("--host", default="localhost", help="MongoDB host") parser.add_argument("--port", default=27017, help="MongoDB port") parser.add_argument("--database", required=True, help="MongoDB database") + parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") def get_args(): diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index 1e7561b1..fae1f8ad 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -95,7 +95,7 @@ def extract(args) -> t.Dict[str, t.Any]: schemas = {} for collection in filtered_collections: - schemas[collection] = extract_schema_from_collection(db[collection], partial) + schemas[collection] = extract_schema_from_collection(db[collection], partial, limit=args.limit) return schemas @@ -121,6 +121,6 @@ def export(args) -> t.IO[bytes]: """ buffer = io.BytesIO() client, db = get_mongodb_client_database(args, document_class=RawBSONDocument) - collection_to_json(db[args.collection], file=buffer) + collection_to_json(db[args.collection], fp=buffer, limit=args.limit) buffer.seek(0) return buffer diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 23eb45d8..c4ca0257 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -25,7 +25,6 @@ """ import calendar -import sys import typing as t import bsonjs @@ -57,6 +56,12 @@ def timestamp_converter(value): def extract_value(value, parent_type=None): + """ + Decode MongoDB Extended JSON. + + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + """ if isinstance(value, dict): if len(value) == 1: for k, v in value.items(): @@ -73,13 +78,16 @@ def extract_value(value, parent_type=None): def convert(d): + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ newdict = {} for k, v in sanitize_field_names(d).items(): newdict[k] = extract_value(v) return newdict -def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.Any] = None): +def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0): """ Export a MongoDB collection's documents to standard JSON. The output is suitable to be consumed by the `cr8` program. @@ -88,11 +96,10 @@ def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.A a Pymongo collection object. file - a file-like object (stream); defaults to the current sys.stdout. + a file-like object (stream). """ - file = file or sys.stdout.buffer - for document in collection.find(): + for document in collection.find().limit(limit): bson_json = bsonjs.dumps(document.raw) json_object = json.loads(bson_json) - file.write(json.dumps(convert(json_object))) - file.write(b"\n") + fp.write(json.dumps(convert(json_object))) + fp.write(b"\n") diff --git a/cratedb_toolkit/io/mongodb/extract.py b/cratedb_toolkit/io/mongodb/extract.py index aa8a258c..e11b99c8 100644 --- a/cratedb_toolkit/io/mongodb/extract.py +++ b/cratedb_toolkit/io/mongodb/extract.py @@ -84,7 +84,7 @@ ) -def extract_schema_from_collection(collection: Collection, partial: bool) -> t.Dict[str, t.Any]: +def extract_schema_from_collection(collection: Collection, partial: bool, limit: int = 0) -> t.Dict[str, t.Any]: """ Extract a schema definition from a collection. @@ -100,7 +100,7 @@ def extract_schema_from_collection(collection: Collection, partial: bool) -> t.D with progressbar: t = progressbar.add_task(collection.name, total=count) try: - for document in collection.find(): + for document in collection.find().limit(limit=limit): schema["count"] += 1 schema["document"] = extract_schema_from_document(document, schema["document"]) progressbar.update(t, advance=1) diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index e6e8d242..65b19e12 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -66,10 +66,15 @@ invoke `export` and `cr8` to actually transfer data. ## Usage for `migr8` -The program `migr8` offers three subcommands (`extract`, `translate`, `export`), +The program `migr8` offers three subcommands `extract`, `translate`, and `export`, to conclude data transfers from MongoDB to CrateDB. Please read this section carefully to learn how they can be used successfully. +If you intend to evaluate `migr8` on a small portion of your data in MongoDB, the +`--limit` command-line option for the `migr8 extract` and `migr8 export` +subcommands might be useful. Using `--limit 10000` is usually both good and fast +enough, to assess if the schema translation and data transfer works well. + ```shell migr8 --version migr8 --help From c28b73b9bf922d6cf16257e5d10e0dfbdd36dba7 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 9 Aug 2024 23:00:13 +0200 Subject: [PATCH 11/12] MongoDB: Fix indentation in prettified SQL output of `migr8 translate` --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/translate.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index cfe61d6e..c2eea613 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,6 +13,7 @@ - MongoDB: Strip single leading underscore character from all top-level fields - MongoDB: Map OID types to CrateDB TEXT columns - MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option +- MongoDB: Fix indentation in prettified SQL output of `migr8 translate` ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/io/mongodb/translate.py b/cratedb_toolkit/io/mongodb/translate.py index 2bf32364..650ef03e 100644 --- a/cratedb_toolkit/io/mongodb/translate.py +++ b/cratedb_toolkit/io/mongodb/translate.py @@ -141,7 +141,7 @@ def indent_sql(query: str) -> str: if len(line) >= 1: if line[-1] == "(": indent += 4 - elif line[-1] == ")": + elif line[-1] == ")" or line[-2:] == "),": indent -= 4 return "\n".join(lines) From 4f4a5e4aa9c194e702f8f506c19a2f61427546ed Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 12 Aug 2024 04:37:12 +0200 Subject: [PATCH 12/12] MongoDB: Add capability to give type hints and add transformations --- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 19 +++++++--- cratedb_toolkit/io/cli.py | 5 ++- cratedb_toolkit/io/mongodb/api.py | 16 ++++++-- cratedb_toolkit/io/mongodb/cli.py | 5 +++ cratedb_toolkit/io/mongodb/core.py | 16 +++++--- cratedb_toolkit/io/mongodb/export.py | 10 ++++- cratedb_toolkit/io/mongodb/transform.py | 49 +++++++++++++++++++++++++ doc/io/mongodb/loader.md | 10 +++++ doc/io/mongodb/migr8.md | 9 +++++ examples/zyp-transformation.yaml | 20 ++++++++++ pyproject.toml | 2 +- 12 files changed, 145 insertions(+), 17 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/transform.py create mode 100644 examples/zyp-transformation.yaml diff --git a/CHANGES.md b/CHANGES.md index c2eea613..a9589fa6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ - MongoDB: Map OID types to CrateDB TEXT columns - MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option - MongoDB: Fix indentation in prettified SQL output of `migr8 translate` +- MongoDB: Add capability to give type hints and add transformations ## 2024/07/25 v0.0.16 - `ctk load table`: Added support for MongoDB Change Streams diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 5ea2e5fe..93a355b3 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -4,6 +4,7 @@ import logging import typing as t from abc import abstractmethod +from pathlib import Path from yarl import URL @@ -18,7 +19,7 @@ class ClusterBase(abc.ABC): @abstractmethod - def load_table(self, resource: InputOutputResource, target: TableAddress): + def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path): raise NotImplementedError("Child class needs to implement this method") @@ -35,7 +36,9 @@ class ManagedCluster(ClusterBase): def __post_init__(self): logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}") - def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None): + def load_table( + self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None + ): """ Load data into a database table on CrateDB Cloud. @@ -96,7 +99,7 @@ class StandaloneCluster(ClusterBase): address: DatabaseAddress info: t.Optional[ClusterInformation] = None - def load_table(self, resource: InputOutputResource, target: TableAddress): + def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None): """ Load data into a database table on a standalone CrateDB Server. @@ -109,11 +112,11 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): """ source_url = resource.url target_url = self.address.dburi + source_url_obj = URL(source_url) if source_url.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy http_scheme = "http://" - source_url_obj = URL(source_url) if asbool(source_url_obj.query.get("ssl")): http_scheme = "https://" source_url = source_url.replace("influxdb2://", http_scheme) @@ -130,7 +133,13 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): else: from cratedb_toolkit.io.mongodb.api import mongodb_copy - if not mongodb_copy(source_url, target_url, progress=True): + if not mongodb_copy( + source_url, + target_url, + transformation=transformation, + limit=int(source_url_obj.query.get("limit", 0)), + progress=True, + ): msg = "Data loading failed" logger.error(msg) raise OperationFailed(msg) diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6c866e58..6d188be0 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -1,4 +1,5 @@ import logging +from pathlib import Path import click from click_aliases import ClickAliasedGroup @@ -35,6 +36,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): @click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") @click.option("--format", "format_", type=str, required=False, help="File format of the import resource") @click.option("--compression", type=str, required=False, help="Compression format of the import resource") +@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file") @click.pass_context def load_table( ctx: click.Context, @@ -46,6 +48,7 @@ def load_table( table: str, format_: str, compression: str, + transformation: Path, ): """ Import data into CrateDB and CrateDB Cloud clusters. @@ -82,4 +85,4 @@ def load_table( cluster = StandaloneCluster(address=address) else: raise NotImplementedError("Unable to select backend") - return cluster.load_table(resource=resource, target=target) + return cluster.load_table(resource=resource, target=target, transformation=transformation) diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index cd22ad45..c4b2d202 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -1,5 +1,6 @@ import argparse import logging +from pathlib import Path from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.core import export, extract, translate @@ -10,7 +11,7 @@ logger = logging.getLogger(__name__) -def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False): +def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): """ Synopsis -------- @@ -36,7 +37,12 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False) # 1. Extract schema from MongoDB collection. logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}") extract_args = argparse.Namespace( - url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full", limit=limit + url=str(mongodb_uri), + database=mongodb_database, + collection=mongodb_collection, + scan="full", + transformation=transformation, + limit=limit, ) mongodb_schema = extract(extract_args) count = mongodb_schema[mongodb_collection]["count"] @@ -65,7 +71,11 @@ def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False) f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}" ) export_args = argparse.Namespace( - url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit + url=str(mongodb_uri), + database=mongodb_database, + collection=mongodb_collection, + transformation=transformation, + limit=limit, ) buffer = export(export_args) cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index 563af816..48cf27c6 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -2,11 +2,13 @@ import json import sys import typing as t +from pathlib import Path from rich.console import Console from cratedb_toolkit import __version__ from cratedb_toolkit.io.mongodb.core import export, extract, translate +from cratedb_toolkit.util.common import setup_logging console = Console(stderr=True) rich = console @@ -25,6 +27,7 @@ def extract_parser(subargs): help="Whether to fully scan the MongoDB collections or only partially.", ) parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") + parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file") parser.add_argument("-o", "--out", required=False) @@ -44,6 +47,7 @@ def export_parser(subargs): parser.add_argument("--port", default=27017, help="MongoDB port") parser.add_argument("--database", required=True, help="MongoDB database") parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") + parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file") def get_args(): @@ -98,6 +102,7 @@ def export_to_stdout(args): def main(): + setup_logging() args = get_args() headline_prefix = "[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter" if args.command == "extract": diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index fae1f8ad..ca0e0772 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -2,6 +2,7 @@ import logging import sys import typing as t +from collections import OrderedDict import pymongo import pymongo.database @@ -12,6 +13,7 @@ from .export import collection_to_json from .extract import extract_schema_from_collection +from .transform import TransformationManager from .translate import translate as translate_schema from .util import parse_input_numbers @@ -38,7 +40,7 @@ def gather_collections(database) -> t.List[str]: rich.print(tbl) - rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty for using all connections.") + rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty to use all collections.") sys.stderr.write("> ") collections_to_ignore = parse_input_numbers(input()) @@ -93,9 +95,12 @@ def extract(args) -> t.Dict[str, t.Any]: rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan") - schemas = {} - for collection in filtered_collections: - schemas[collection] = extract_schema_from_collection(db[collection], partial, limit=args.limit) + tm = TransformationManager(path=args.transformation) + schemas = OrderedDict() + for collection_name in filtered_collections: + collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit) + tm.apply_type_overrides(db.name, collection_name, collection_schema) + schemas[collection_name] = collection_schema return schemas @@ -119,8 +124,9 @@ def export(args) -> t.IO[bytes]: TODO: Run on multiple collections, like `extract`. """ + tm = TransformationManager(path=args.transformation) buffer = io.BytesIO() client, db = get_mongodb_client_database(args, document_class=RawBSONDocument) - collection_to_json(db[args.collection], fp=buffer, limit=args.limit) + collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit) buffer.seek(0) return buffer diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index c4ca0257..87cafc15 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -32,6 +32,7 @@ import orjson as json import pymongo.collection +from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names @@ -87,7 +88,9 @@ def convert(d): return newdict -def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0): +def collection_to_json( + collection: pymongo.collection.Collection, fp: t.IO[t.Any], tm: TransformationManager = None, limit: int = 0 +): """ Export a MongoDB collection's documents to standard JSON. The output is suitable to be consumed by the `cr8` program. @@ -101,5 +104,8 @@ def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any for document in collection.find().limit(limit): bson_json = bsonjs.dumps(document.raw) json_object = json.loads(bson_json) - fp.write(json.dumps(convert(json_object))) + data = convert(json_object) + if tm: + data = tm.apply_transformations(collection.database.name, collection.name, data) + fp.write(json.dumps(data)) fp.write(b"\n") diff --git a/cratedb_toolkit/io/mongodb/transform.py b/cratedb_toolkit/io/mongodb/transform.py new file mode 100644 index 00000000..84047037 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/transform.py @@ -0,0 +1,49 @@ +import logging +import typing as t +from pathlib import Path + +from jsonpointer import JsonPointer +from zyp.model.collection import CollectionAddress, CollectionTransformation +from zyp.model.project import TransformationProject + +logger = logging.getLogger(__name__) + + +class TransformationManager: + def __init__(self, path: Path): + self.path = path + self.active = False + if not self.path: + return + if not self.path.exists(): + raise FileNotFoundError(f"File does not exist: {self.path}") + self.project = TransformationProject.from_yaml(self.path.read_text()) + logger.info("Transformation manager initialized. File: %s", self.path) + self.active = True + + def apply_type_overrides(self, database_name: str, collection_name: str, collection_schema: t.Dict[str, t.Any]): + if not self.active: + return + address = CollectionAddress(database_name, collection_name) + try: + transformation: CollectionTransformation = self.project.get(address) + except KeyError: + return + logger.info(f"Applying type overrides for {database_name}/{collection_name}") + # TODO: Also support addressing nested elements. + # Hint: Implementation already exists on another machine, + # where it has not been added to the repository. Sigh. + for rule in transformation.schema.rules: + pointer = JsonPointer(f"/document{rule.pointer}/types") + type_stats = pointer.resolve(collection_schema) + type_stats[rule.type] = 1e10 + + def apply_transformations(self, database_name: str, collection_name: str, data: t.Dict[str, t.Any]): + if not self.active: + return data + address = CollectionAddress(database_name, collection_name) + try: + transformation: CollectionTransformation = self.project.get(address) + except KeyError: + return data + return transformation.bucket.apply(data) diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 369dd5a2..7242687a 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -49,6 +49,13 @@ ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` +## Using Zyp transformations +You can use [Zyp transformations] to change the shape of the data while being +transferred. In order to add it to the pipeline, use the `--transformation` +command line option on the `migr8 extract` and `migr8 export` commands. + +You can find an example file at `examples/zyp-transformation.yaml`. + :::{todo} Use `mongoimport`. @@ -56,3 +63,6 @@ Use `mongoimport`. mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority' ``` ::: + + +[Zyp transformations]: https://commons-codec.readthedocs.io/zyp/index.html diff --git a/doc/io/mongodb/migr8.md b/doc/io/mongodb/migr8.md index 65b19e12..377820f6 100644 --- a/doc/io/mongodb/migr8.md +++ b/doc/io/mongodb/migr8.md @@ -222,4 +222,13 @@ Alternatively, use [cr8] to directly write the MongoDB collection into a CrateDB cr8 insert-json --hosts localhost:4200 --table test +### Using Zyp transformations +You can use [Zyp transformations] to change the shape of the data while being +transferred. In order to add it to the pipeline, use the `--transformation` +command line option on the `migr8 extract` and `migr8 export` commands. + +You can find an example file at `examples/zyp-transformation.yaml`. + + [cr8]: https://github.com/mfussenegger/cr8 +[Zyp transformations]: https://commons-codec.readthedocs.io/zyp/index.html diff --git a/examples/zyp-transformation.yaml b/examples/zyp-transformation.yaml new file mode 100644 index 00000000..183caff0 --- /dev/null +++ b/examples/zyp-transformation.yaml @@ -0,0 +1,20 @@ +meta: + type: zyp-project + version: 1 +collections: +- address: + container: testdrive-db + name: foobar-collection + schema: + rules: + - pointer: /some_date + type: DATETIME + - pointer: /another_date + type: DATETIME + bucket: + values: + rules: + - pointer: /some_date + transformer: to_unixtime + - pointer: /another_date + transformer: to_unixtime diff --git a/pyproject.toml b/pyproject.toml index 081130e6..9c8ca002 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -154,7 +154,7 @@ kinesis = [ "lorrystream @ git+https://github.com/daq-tools/lorrystream.git@55cf456fdcd3", ] mongodb = [ - "commons-codec[mongodb]==0.0.3", + "commons-codec[mongodb,zyp]==0.0.4", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1",