diff --git a/docs/integrations/databases/CrateDB.mdx b/docs/integrations/databases/CrateDB.mdx new file mode 100644 index 000000000000..6c08ac2ba03b --- /dev/null +++ b/docs/integrations/databases/CrateDB.mdx @@ -0,0 +1,57 @@ +--- +title: "CrateDB" +sidebarTitle: "CrateDB" +--- + +## Credentials + +Open the file named `io_config.yaml` at the root of your Mage project and enter cratedb required fields: + +```yaml +version: 0.1.1 +default: + CRATEDB_COLLECTION: collection_name + CRATEDB_PATH: path of the cratedb persisitant storage +``` + +## Dependencies + +The dependency libraries are not installed in the docker image by default. You'll need to add the libraries to +project `requirements.txt` file manually and install them. + +``` +cratedb-client==1.6.9 +sentence-transformers==2.2.2 +``` + +## Using Python block + +1. Create a new pipeline or open an existing pipeline. +2. Add a data loader, transformer, or data exporter block (the code snippet + below is for a data loader). +3. Select `Generic (no template)`. +4. Enter this code snippet (note: change the `config_profile` from `default` if + you have a different profile): + +```python +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from os import path +from pandas import DataFrame + +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader + + +@data_loader +def load_data_from_postgres(**kwargs) -> DataFrame: + query = 'SELECT 1' + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + return loader.load(query) +``` + +5. Run the block. diff --git a/docs/mint.json b/docs/mint.json index 69e28ae82d1c..fc5d7e0685ed 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -359,6 +359,7 @@ "integrations/databases/BigQuery", "integrations/databases/ClickHouse", "integrations/databases/Chroma", + "integrations/databases/CrateDB", "integrations/databases/Druid", "integrations/databases/DuckDB", "integrations/databases/GoogleSheets", diff --git a/mage_ai/data_preparation/templates/constants.py b/mage_ai/data_preparation/templates/constants.py index d104d423580f..5b889441a76f 100644 --- a/mage_ai/data_preparation/templates/constants.py +++ b/mage_ai/data_preparation/templates/constants.py @@ -206,6 +206,13 @@ name='Chroma', path='data_loaders/chroma.py', ), + dict( + block_type=BlockType.DATA_LOADER, + groups=[GROUP_DATABASES], + language=BlockLanguage.PYTHON, + name='CrateDB', + path='data_loaders/cratedb.py', + ), dict( block_type=BlockType.DATA_LOADER, groups=[GROUP_DATABASES], @@ -594,6 +601,13 @@ name='Chroma', path='data_exporters/chroma.py', ), + dict( + block_type=BlockType.DATA_EXPORTER, + groups=[GROUP_DATABASES], + language=BlockLanguage.PYTHON, + name='CrateDB', + path='data_exporters/cratedb.py', + ), dict( block_type=BlockType.DATA_EXPORTER, groups=[GROUP_DATABASES], diff --git a/mage_ai/data_preparation/templates/data_exporters/cratedb.py b/mage_ai/data_preparation/templates/data_exporters/cratedb.py new file mode 100644 index 000000000000..1878a2d8aef5 --- /dev/null +++ b/mage_ai/data_preparation/templates/data_exporters/cratedb.py @@ -0,0 +1,31 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.cratedb import CrateDB +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_cratedb(df: DataFrame, **kwargs) -> None: + """ + Template for exporting data to a PostgreSQL database. + Specify your configuration settings in 'io_config.yaml'. + + Docs: https://docs.mage.ai/design/data-loading#postgresql + """ + schema_name = 'your_schema_name' # Specify the name of the schema to export data to + table_name = 'your_table_name' # Specify the name of the table to export data to + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with CrateDB.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df, + schema_name, + table_name, + index=False, # Specifies whether to include index in exported table + if_exists='replace', # Specify resolution policy if table name already exists + ) diff --git a/mage_ai/data_preparation/templates/data_loaders/cratedb.py b/mage_ai/data_preparation/templates/data_loaders/cratedb.py new file mode 100644 index 000000000000..19c39fbbf13c --- /dev/null +++ b/mage_ai/data_preparation/templates/data_loaders/cratedb.py @@ -0,0 +1,26 @@ +{% extends "data_loaders/default.jinja" %} +{% block imports %} +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.cratedb import CrateDB +from os import path +{{ super() -}} +{% endblock %} + + +{% block content %} +@data_loader +def load_data_from_cratedb(*args, **kwargs): + """ + Template for loading data from a PostgreSQL database. + Specify your configuration settings in 'io_config.yaml'. + + Docs: https://docs.mage.ai/design/data-loading#postgresql + """ + query = 'your CrateDB query' # Specify your SQL query here + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with CrateDB.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + return loader.load(query) +{% endblock %} diff --git a/mage_ai/data_preparation/templates/repo/io_config.yaml b/mage_ai/data_preparation/templates/repo/io_config.yaml index 80b4d9cef70a..0cd27a9acdf3 100644 --- a/mage_ai/data_preparation/templates/repo/io_config.yaml +++ b/mage_ai/data_preparation/templates/repo/io_config.yaml @@ -26,6 +26,14 @@ default: CLICKHOUSE_PASSWORD: null CLICKHOUSE_PORT: 8123 CLICKHOUSE_USERNAME: null + # CrateDB + CRATEDB_CONNECT_TIMEOUT: 10 + CRATEDB_DBNAME: crate + CRATEDB_SCHEMA: doc # Optional + CRATEDB_USER: username + CRATEDB_PASSWORD: password + CRATEDB_HOST: hostname + CRATEDB_PORT: 5432 # Druid DRUID_HOST: hostname DRUID_PASSWORD: password diff --git a/mage_ai/io/base.py b/mage_ai/io/base.py index 49156fbc068b..7ea8343fdc67 100644 --- a/mage_ai/io/base.py +++ b/mage_ai/io/base.py @@ -19,6 +19,7 @@ class DataSource(str, Enum): BIGQUERY = 'bigquery' CHROMA = 'chroma' CLICKHOUSE = 'clickhouse' + CRATEDB = 'cratedb' DRUID = 'druid' DUCKDB = 'duckdb' FILE = 'file' diff --git a/mage_ai/io/config.py b/mage_ai/io/config.py index 50b14fb22cb5..5bd56a76ff85 100644 --- a/mage_ai/io/config.py +++ b/mage_ai/io/config.py @@ -41,6 +41,20 @@ class ConfigKey(str, Enum): CLICKHOUSE_PORT = 'CLICKHOUSE_PORT' CLICKHOUSE_USERNAME = 'CLICKHOUSE_USERNAME' + CRATEDB_CONNECTION_METHOD = 'CRATEDB_CONNECTION_METHOD' + CRATEDB_CONNECT_TIMEOUT = 'CRATEDB_CONNECT_TIMEOUT' + CRATEDB_DBNAME = 'CRATEDB_DBNAME' + CRATEDB_HOST = 'CRATEDB_HOST' + CRATEDB_PASSWORD = 'CRATEDB_PASSWORD' + CRATEDB_PORT = 'CRATEDB_PORT' + CRATEDB_SCHEMA = 'CRATEDB_SCHEMA' + CRATEDB_SSH_HOST = 'CRATEDB_SSH_HOST' + CRATEDB_SSH_PASSWORD = 'CRATEDB_SSH_PASSWORD' + CRATEDB_SSH_PKEY = 'CRATEDB_SSH_PKEY' + CRATEDB_SSH_PORT = 'CRATEDB_SSH_PORT' + CRATEDB_SSH_USERNAME = 'CRATEDB_SSH_USERNAME' + CRATEDB_USER = 'CRATEDB_USER' + DRUID_HOST = 'DRUID_HOST' DRUID_PASSWORD = 'DRUID_PASSWORD' DRUID_PATH = 'DRUID_PATH' @@ -343,6 +357,7 @@ class VerboseConfigKey(str, Enum): BIGQUERY = 'BigQuery' CHROMA = 'Chroma' CLICKHOUSE = 'ClickHouse' + CRATEDB = "CrateDB" DRUID = 'Druid' DUCKDB = 'Duck DB' PINOT = 'Pinot' @@ -413,6 +428,12 @@ class ConfigFileLoader(BaseConfigLoader): VerboseConfigKey.CLICKHOUSE, 'port'), ConfigKey.CLICKHOUSE_USERNAME: ( VerboseConfigKey.CLICKHOUSE, 'username'), + ConfigKey.CRATEDB_DBNAME: (VerboseConfigKey.CRATEDB, 'database'), + ConfigKey.CRATEDB_HOST: (VerboseConfigKey.CRATEDB, 'host'), + ConfigKey.CRATEDB_PASSWORD: (VerboseConfigKey.CRATEDB, 'password'), + ConfigKey.CRATEDB_PORT: (VerboseConfigKey.CRATEDB, 'port'), + ConfigKey.CRATEDB_SCHEMA: (VerboseConfigKey.CRATEDB, 'schema'), + ConfigKey.CRATEDB_USER: (VerboseConfigKey.CRATEDB, 'user'), ConfigKey.DRUID_HOST: (VerboseConfigKey.DRUID, 'host'), ConfigKey.DRUID_PASSWORD: (VerboseConfigKey.DRUID, 'password'), ConfigKey.DRUID_PATH: (VerboseConfigKey.DRUID, 'path'), diff --git a/mage_ai/io/cratedb.py b/mage_ai/io/cratedb.py new file mode 100644 index 000000000000..b3b914efedc5 --- /dev/null +++ b/mage_ai/io/cratedb.py @@ -0,0 +1,389 @@ +import traceback +from typing import IO, List, Union + +import numpy as np +import pandas as pd +import simplejson +from pandas import DataFrame, Series +from psycopg2 import _psycopg, connect +from sshtunnel import SSHTunnelForwarder + +from mage_ai.io.config import BaseConfigLoader, ConfigKey +from mage_ai.io.constants import UNIQUE_CONFLICT_METHOD_UPDATE +from mage_ai.io.export_utils import BadConversionError, PandasTypes +from mage_ai.io.sql import BaseSQL +from mage_ai.shared.parsers import encode_complex +from mage_ai.shared.utils import is_port_in_use + + +class CrateDB(BaseSQL): + """ + Handles data transfer between a CrateDB database and the Mage app. + """ + def __init__( + self, + dbname: str, + user: str, + password: str, + host: str, + port: Union[str, None] = None, + schema: str = None, + connection_method: str = 'direct', + ssh_host: Union[str, None] = None, + ssh_port: Union[str, None] = None, + ssh_username: Union[str, None] = None, + ssh_password: Union[str, None] = None, + ssh_pkey: Union[str, None] = None, + verbose=True, + connect_timeout: int = None, + **kwargs, + ) -> None: + """ + Initializes the data loader. + + Args: + dbname (str): The name of the database to connect to. + user (str): The user with which to connect to the database with. + password (str): The login password for the user. + host (str): Path to host address for database. + port (str): Port on which the database is running. + **kwargs: Additional settings for creating SQLAlchemy engine and connection + """ + self.ssh_tunnel = None + super().__init__( + verbose=verbose, + dbname=dbname, + user=user, + password=password, + host=host, + port=port, + schema=schema, + connection_method=connection_method, + ssh_host=ssh_host, + ssh_port=ssh_port, + ssh_username=ssh_username, + ssh_password=ssh_password, + ssh_pkey=ssh_pkey, + connect_timeout=connect_timeout, + **kwargs, + ) + + @classmethod + def with_config(cls, config: BaseConfigLoader) -> 'CrateDB': + return cls( + dbname=config[ConfigKey.CRATEDB_DBNAME], + user=config[ConfigKey.CRATEDB_USER], + password=config[ConfigKey.CRATEDB_PASSWORD], + host=config[ConfigKey.CRATEDB_HOST], + port=config[ConfigKey.CRATEDB_PORT], + schema=config[ConfigKey.CRATEDB_SCHEMA], + connection_method=config[ConfigKey.CRATEDB_CONNECTION_METHOD], + ssh_host=config[ConfigKey.CRATEDB_SSH_HOST], + ssh_port=config[ConfigKey.CRATEDB_SSH_PORT], + ssh_username=config[ConfigKey.CRATEDB_SSH_USERNAME], + ssh_password=config[ConfigKey.CRATEDB_SSH_PASSWORD], + ssh_pkey=config[ConfigKey.CRATEDB_SSH_PKEY], + connect_timeout=config[ConfigKey.CRATEDB_CONNECT_TIMEOUT], + ) + + def default_database(self) -> str: + return self.settings['dbname'] + + def default_schema(self) -> str: + return self.settings.get('schema') + + def open(self) -> None: + with self.printer.print_msg('Opening connection to CrateDB database'): + database = self.settings['dbname'] + host = self.settings['host'] + password = self.settings['password'] + port = self.settings['port'] + user = self.settings['user'] + if self.settings['connection_method'] == 'ssh_tunnel': + ssh_setting = dict(ssh_username=self.settings['ssh_username']) + if self.settings['ssh_pkey'] is not None: + ssh_setting['ssh_pkey'] = self.settings['ssh_pkey'] + else: + ssh_setting['ssh_password'] = self.settings['ssh_password'] + + # Find an available local port + local_port = port + max_local_port = local_port + 100 + while is_port_in_use(local_port): + if local_port > max_local_port: + raise Exception( + 'Unable to find an open port, please clear your running processes ' + 'if possible.' + ) + local_port += 1 + self.ssh_tunnel = SSHTunnelForwarder( + (self.settings['ssh_host'], self.settings['ssh_port']), + remote_bind_address=(host, port), + local_bind_address=('', local_port), + **ssh_setting, + ) + self.ssh_tunnel.start() + self.ssh_tunnel._check_is_started() + + host = '127.0.0.1' + port = self.ssh_tunnel.local_bind_port + + connect_opts = dict( + database=database, + host=host, + password=password, + port=port, + user=user, + keepalives=1, + keepalives_idle=300, + ) + + if self.settings.get('connect_timeout'): + connect_opts['connect_timeout'] = self.settings['connect_timeout'] + + try: + self._ctx = connect(**connect_opts) + except Exception: + if self.ssh_tunnel is not None: + self.ssh_tunnel.stop() + self.ssh_tunnel = None + traceback.print_exc() + + def close(self) -> None: + """ + Close the underlying connection to the SQL data source if open. Else will do nothing. + """ + if '_ctx' in self.__dict__: + self._ctx.close() + del self._ctx + if self.ssh_tunnel is not None: + self.ssh_tunnel.stop() + self.ssh_tunnel = None + if self.verbose and self.printer.exists_previous_message: + print('') + + def build_create_schema_command( + self, + schema_name: str + ) -> str: + # No schema creation needed + return "SELECT 1;" + + def table_exists(self, schema_name: str, table_name: str) -> bool: + with self.conn.cursor() as cur: + table_name = table_name.replace('"', '') + cur.execute( + f'SELECT * FROM pg_tables WHERE schemaname = \'{schema_name}\' AND ' + f'tablename = \'{table_name}\'' + ) + return bool(cur.rowcount) + + def get_type(self, column: Series, dtype: str) -> str: + if dtype in ( + PandasTypes.MIXED, + PandasTypes.UNKNOWN_ARRAY, + PandasTypes.COMPLEX, + ): + series = column[column.notnull()] + values = series.values + + column_type = None + + if len(values) >= 1: + column_type = 'JSONB' + + values_not_empty_list = [v for v in values if type(v) is not list or v] + if not values_not_empty_list: + # All values are empty list + return column_type + value = values_not_empty_list[0] + if isinstance(value, list): + if len(value) >= 1: + item = value[0] + if type(item) is dict: + column_type = 'JSONB' + else: + item_series = pd.Series(data=item) + item_dtype = item_series.dtype + if PandasTypes.OBJECT != item_dtype: + item_type = self.get_type(item_series, item_dtype) + column_type = f'{item_type}[]' + else: + column_type = 'text[]' + else: + column_type = 'text[]' + + if column_type: + return column_type + + raise BadConversionError( + f'Cannot convert column \'{column.name}\' with data type \'{dtype}\' to ' + 'a CrateDB datatype.' + ) + elif dtype in (PandasTypes.DATETIME, PandasTypes.DATETIME64): + try: + if column.dt.tz: + return 'timestamptz' + except AttributeError: + pass + return 'timestamp' + elif dtype == PandasTypes.TIME: + try: + if column.dt.tz: + return 'timetz' + except AttributeError: + pass + return 'time' + elif dtype == PandasTypes.DATE: + return 'date' + elif dtype == PandasTypes.STRING: + return 'text' + elif dtype == PandasTypes.CATEGORICAL: + return 'text' + elif dtype == PandasTypes.BYTES: + return 'bytea' + elif dtype in (PandasTypes.FLOATING, PandasTypes.DECIMAL, PandasTypes.MIXED_INTEGER_FLOAT): + return 'double precision' + elif dtype == PandasTypes.INTEGER or dtype == PandasTypes.INT64: + max_int, min_int = column.max(), column.min() + if np.int16(max_int) == max_int and np.int16(min_int) == min_int: + return 'smallint' + elif np.int32(max_int) == max_int and np.int32(min_int) == min_int: + return 'integer' + else: + return 'bigint' + elif dtype == PandasTypes.BOOLEAN: + return 'boolean' + elif dtype in (PandasTypes.TIMEDELTA, PandasTypes.TIMEDELTA64, PandasTypes.PERIOD): + return 'bigint' + elif dtype == PandasTypes.EMPTY: + return 'text' + elif PandasTypes.OBJECT == dtype: + return 'JSONB' + else: + print(f'Invalid datatype provided: {dtype}') + + return 'text' + + def upload_dataframe( + self, + cursor: _psycopg.cursor, + df: DataFrame, + db_dtypes: List[str], + dtypes: List[str], + full_table_name: str, + buffer: Union[IO, None] = None, + allow_reserved_words: bool = False, + unique_conflict_method: str = None, + unique_constraints: List[str] = None, + **kwargs, + ) -> None: + if unique_constraints and unique_conflict_method: + use_insert_command = True + else: + # Use COPY command + use_insert_command = False + + # force USE INSERT + use_insert_command = True + + def clean_array_value(val): + if val is None or type(val) is not str or len(val) < 2: + return val + if val[0] == '[' and val[-1] == ']': + return '{' + val[1:-1] + '}' + return val + + def serialize_obj(val): + if type(val) is dict or type(val) is np.ndarray: + return simplejson.dumps( + val, + default=encode_complex, + ignore_nan=True, + ) + elif type(val) is list and len(val) >= 1 and type(val[0]) is dict: + return simplejson.dumps( + val, + default=encode_complex, + ignore_nan=True, + ) + elif not use_insert_command and type(val) is list: + return clean_array_value(simplejson.dumps( + val, + default=encode_complex, + ignore_nan=True, + )) + return val + + df_ = df.copy() + columns = df_.columns + + for col in columns: + df_col_dropna = df_[col].dropna() + if df_col_dropna.count() == 0: + continue + if dtypes[col] == PandasTypes.OBJECT \ + or (df_[col].dtype == PandasTypes.OBJECT and not + isinstance(df_col_dropna.iloc[0], str)): + df_[col] = df_[col].apply(lambda x: serialize_obj(x)) + df_.replace({np.NaN: None}, inplace=True) + + insert_columns = ', '.join([f'"{col}"'for col in columns]) + + if use_insert_command: + # Use INSERT command + values_placeholder = ', '.join(["%s" for i in range(len(columns))]) + values = [] + for _, row in df_.iterrows(): + values.append(tuple(row)) + commands = [ + f'INSERT INTO {full_table_name} ({insert_columns})', + f'VALUES ({values_placeholder})', + ] + + #unique_constraints = \ + # [f'"{self._clean_column_name(col, allow_reserved_words=allow_reserved_words)}"' + # for col in unique_constraints] + #columns_cleaned = \ + # [f'"{self._clean_column_name(col, allow_reserved_words=allow_reserved_words)}"' + # for col in columns] +# + #commands.append(f"ON CONFLICT ({', '.join(unique_constraints)})") + #if UNIQUE_CONFLICT_METHOD_UPDATE == unique_conflict_method: + # update_command = [f'{col} = EXCLUDED.{col}' for col in columns_cleaned] + # commands.append( + # f"DO UPDATE SET {', '.join(update_command)}", + # ) + #else: + # commands.append('DO NOTHING') + cursor. executemany('\n'.join(commands), values) + else: + # Use COPY command + df_.to_csv( + buffer, + header=False, + index=False, + na_rep='', + ) + buffer.seek(0) + cursor.copy_expert(f""" +COPY {full_table_name} ({insert_columns}) FROM STDIN ( + FORMAT csv + , DELIMITER \',\' + , NULL \'\' + , FORCE_NULL({insert_columns}) +); + """, buffer) + + def execute(self, query_string: str, **query_vars) -> None: + """ + Sends query to the connected database. + + Args: + query_string (str): SQL query string to apply on the connected database. + query_vars: Variable values to fill in when using format strings in query. + """ + with self.printer.print_msg(f'Executing query \'{query_string}\''): + query_string = self._clean_query(query_string) + with self.conn.cursor() as cur: + cur.execute(query_string, query_vars) diff --git a/setup.py b/setup.py index abe8f9345f6e..07d4024806cb 100644 --- a/setup.py +++ b/setup.py @@ -63,6 +63,11 @@ def readme(): 'clickhouse': [ 'clickhouse-connect~=0.6.23', ], + 'cratedb': [ + 'psycopg2==2.9.3', + 'psycopg2-binary==2.9.3', + 'sshtunnel==0.4.0', + ], 'dbt': [ 'dbt-bigquery==1.7.2', 'dbt-clickhouse==1.7.1',