Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: Various fixes and improvements #216

Merged
merged 12 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@

## Unreleased
- Processor: Updated Kinesis Lambda processor to understand AWS DMS
- MongoDB: Fix missing output on STDOUT for `migr8 export`
- MongoDB: Improve timestamp parsing by using `python-dateutil`
- MongoDB: Converge `_id` input field to `id` column instead of dropping it
- MongoDB: Make user interface use stderr, so stdout is for data only
- MongoDB: Make `migr8 extract` write to stdout by default
- MongoDB: Make `migr8 translate` read from stdin by default
- MongoDB: Improve user interface messages
- MongoDB: Strip single leading underscore character from all top-level fields
- MongoDB: Map OID types to CrateDB TEXT columns
- MongoDB: Make `migr8 extract` and `migr8 export` accept the `--limit` option
- MongoDB: Fix indentation in prettified SQL output of `migr8 translate`
- MongoDB: Add capability to give type hints and add transformations

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
19 changes: 14 additions & 5 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import typing as t
from abc import abstractmethod
from pathlib import Path

from yarl import URL

Expand All @@ -18,7 +19,7 @@

class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path):
raise NotImplementedError("Child class needs to implement this method")


Expand All @@ -35,7 +36,9 @@ class ManagedCluster(ClusterBase):
def __post_init__(self):
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}")

def load_table(self, resource: InputOutputResource, target: t.Optional[TableAddress] = None):
def load_table(
self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None
):
"""
Load data into a database table on CrateDB Cloud.

Expand Down Expand Up @@ -96,7 +99,7 @@ class StandaloneCluster(ClusterBase):
address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress):
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None):
"""
Load data into a database table on a standalone CrateDB Server.

Expand All @@ -109,11 +112,11 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
"""
source_url = resource.url
target_url = self.address.dburi
source_url_obj = URL(source_url)
if source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http://"
source_url_obj = URL(source_url)
if asbool(source_url_obj.query.get("ssl")):
http_scheme = "https://"
source_url = source_url.replace("influxdb2://", http_scheme)
Expand All @@ -130,7 +133,13 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
if not mongodb_copy(
source_url,
target_url,
transformation=transformation,
limit=int(source_url_obj.query.get("limit", 0)),
progress=True,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
Expand Down
5 changes: 4 additions & 1 deletion cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from pathlib import Path

import click
from click_aliases import ClickAliasedGroup
Expand Down Expand Up @@ -35,6 +36,7 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
@click.pass_context
def load_table(
ctx: click.Context,
Expand All @@ -46,6 +48,7 @@ def load_table(
table: str,
format_: str,
compression: str,
transformation: Path,
):
"""
Import data into CrateDB and CrateDB Cloud clusters.
Expand Down Expand Up @@ -82,4 +85,4 @@ def load_table(
cluster = StandaloneCluster(address=address)
else:
raise NotImplementedError("Unable to select backend")
return cluster.load_table(resource=resource, target=target)
return cluster.load_table(resource=resource, target=target, transformation=transformation)
18 changes: 15 additions & 3 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import logging
from pathlib import Path

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.core import export, extract, translate
Expand All @@ -10,7 +11,7 @@
logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, progress: bool = False):
def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
"""
Synopsis
--------
Expand All @@ -36,7 +37,12 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
# 1. Extract schema from MongoDB collection.
logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
extract_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full"
url=str(mongodb_uri),
database=mongodb_database,
collection=mongodb_collection,
scan="full",
transformation=transformation,
limit=limit,
)
mongodb_schema = extract(extract_args)
count = mongodb_schema[mongodb_collection]["count"]
Expand Down Expand Up @@ -64,7 +70,13 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
f"Transferring data from MongoDB to CrateDB: "
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection)
export_args = argparse.Namespace(
url=str(mongodb_uri),
database=mongodb_database,
collection=mongodb_collection,
transformation=transformation,
limit=limit,
)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

Expand Down
48 changes: 36 additions & 12 deletions cratedb_toolkit/io/mongodb/cli.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import argparse
import json
import sys
import typing as t
from pathlib import Path

import rich
from rich.console import Console

from cratedb_toolkit import __version__
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.util.common import setup_logging

console = Console(stderr=True)
rich = console


def extract_parser(subargs):
Expand All @@ -19,7 +26,9 @@ def extract_parser(subargs):
choices=["full", "partial"],
help="Whether to fully scan the MongoDB collections or only partially.",
)
parser.add_argument("-o", "--out", default="mongodb_schema.json")
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file")
parser.add_argument("-o", "--out", required=False)


def translate_parser(subargs):
Expand All @@ -37,6 +46,8 @@ def export_parser(subargs):
parser.add_argument("--host", default="localhost", help="MongoDB host")
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file")


def get_args():
Expand All @@ -61,32 +72,45 @@ def extract_to_file(args):
"""

schema = extract(args)
rich.print(f"\nWriting resulting schema to {args.out}...")
with open(args.out, "w") as out:
json.dump(schema, out, indent=4)
rich.print("[green bold]Done![/green bold]")

out_label = args.out or "stdout"
rich.print(f"Writing resulting schema to {out_label}")
fp: t.TextIO
if args.out:
fp = open(args.out, "w")
else:
fp = sys.stdout
json.dump(schema, fp=fp, indent=4)
fp.flush()


def translate_from_file(args):
"""
Read in a JSON file and extract the schema from it.
"""

with open(args.infile) as f:
schema = json.load(f)
translate(schema)
fp: t.TextIO
if args.infile:
fp = open(args.infile, "r")
else:
fp = sys.stdin
schema = json.load(fp)
translate(schema)


def export_to_stdout(args):
export(args)
sys.stdout.buffer.write(export(args).read())


def main():
rich.print("\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n")
setup_logging()
args = get_args()
headline_prefix = "[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter"
if args.command == "extract":
rich.print(f"{headline_prefix} -> Schema Extractor")
extract_to_file(args)
elif args.command == "translate":
rich.print(f"{headline_prefix} -> Schema Translator")
translate_from_file(args)
elif args.command == "export":
rich.print(f"{headline_prefix} -> Data Exporter")
export_to_stdout(args)
34 changes: 23 additions & 11 deletions cratedb_toolkit/io/mongodb/core.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import io
import logging
import sys
import typing as t
from collections import OrderedDict

import pymongo
import pymongo.database
import rich
from bson.raw_bson import RawBSONDocument
from rich.console import Console
from rich.syntax import Syntax
from rich.table import Table

from .export import collection_to_json
from .extract import extract_schema_from_collection
from .transform import TransformationManager
from .translate import translate as translate_schema
from .util import parse_input_numbers

logger = logging.getLogger(__name__)

console = Console(stderr=True)
rich = console


def gather_collections(database) -> t.List[str]:
"""
Expand All @@ -23,7 +30,7 @@ def gather_collections(database) -> t.List[str]:

collections = database.list_collection_names()

tbl = rich.table.Table(show_header=True, header_style="bold blue")
tbl = Table(show_header=True, header_style="bold blue")
tbl.add_column("Id", width=3)
tbl.add_column("Collection Name")
tbl.add_column("Estimated Size")
Expand All @@ -33,9 +40,10 @@ def gather_collections(database) -> t.List[str]:

rich.print(tbl)

rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2')")
rich.print("\nCollections to exclude: (eg: '0 1 2', '0, 1, 2', '0-2'). Leave empty to use all collections.")

collections_to_ignore = parse_input_numbers(input("> "))
sys.stderr.write("> ")
collections_to_ignore = parse_input_numbers(input())
filtered_collections = []
for i, c in enumerate(collections):
if i not in collections_to_ignore:
Expand Down Expand Up @@ -80,15 +88,19 @@ def extract(args) -> t.Dict[str, t.Any]:
else:
rich.print("\nDo a [red bold]full[/red bold] collection scan?")
rich.print("A full scan will iterate over all documents in the collection, a partial only one document. (Y/n)")
full = input("> ").strip().lower()
sys.stderr.write("> ")
full = input().strip().lower()

partial = full != "y"

rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan...")
rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan")

schemas = {}
for collection in filtered_collections:
schemas[collection] = extract_schema_from_collection(db[collection], partial)
tm = TransformationManager(path=args.transformation)
schemas = OrderedDict()
for collection_name in filtered_collections:
collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit)
tm.apply_type_overrides(db.name, collection_name, collection_schema)
schemas[collection_name] = collection_schema
return schemas


Expand All @@ -103,7 +115,6 @@ def translate(schemas, schemaname: str = None) -> t.Dict[str, str]:
syntax = Syntax(query, "sql")
rich.print(f"Collection [blue bold]'{collection}'[/blue bold]:")
rich.print(syntax)
rich.print()
return result


Expand All @@ -113,8 +124,9 @@ def export(args) -> t.IO[bytes]:

TODO: Run on multiple collections, like `extract`.
"""
tm = TransformationManager(path=args.transformation)
buffer = io.BytesIO()
client, db = get_mongodb_client_database(args, document_class=RawBSONDocument)
collection_to_json(db[args.collection], file=buffer)
collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit)
buffer.seek(0)
return buffer
Loading
Loading