From 907ec78daa03dfcc8914fe81d35f530708d799a7 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Thu, 18 Apr 2024 14:43:44 +0200 Subject: [PATCH 1/7] Add postgresql migration --- python/equality_py.py | 5 +++ python/migrate.py | 76 +++++++++++++++++++++++++++++++++++ python/requirements.txt | 1 + python/requirements_no_ml.txt | 1 + 4 files changed, 83 insertions(+) create mode 100644 python/equality_py.py diff --git a/python/equality_py.py b/python/equality_py.py new file mode 100644 index 000000000..6b35e03c1 --- /dev/null +++ b/python/equality_py.py @@ -0,0 +1,5 @@ +import mgp + +@mgp.function +def is_equal(ctx: mgp.FuncCtx, output: str, input: str): + return output == input diff --git a/python/migrate.py b/python/migrate.py index 295239817..6551c28e9 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -3,6 +3,7 @@ import mysql.connector as mysql_connector import oracledb import pyodbc +import psycopg2 import threading from typing import Any, Dict @@ -264,6 +265,81 @@ def cleanup_migrate_oracle_db(): mgp.add_batch_read_proc(oracle_db, init_migrate_oracle_db, cleanup_migrate_oracle_db) +# PostgreSQL dictionary to store connections and cursors by thread +postgres_dict = {} + +def init_migrate_postgresql( + table_or_sql: str, + config: mgp.Map, + config_path: str = "", + params: mgp.Nullable[mgp.Any] = None, +): + global postgres_dict + + if params: + _check_params_type(params, (list, tuple)) + else: + params = [] + + if len(config_path) > 0: + config = _combine_config(config=config, config_path=config_path) + + if _query_is_table(table_or_sql): + table_or_sql = f"SELECT * FROM {table_or_sql};" + + if threading.get_native_id not in postgres_dict: + postgres_dict[threading.get_native_id] = {} + + if Constants.CURSOR not in postgres_dict[threading.get_native_id]: + postgres_dict[threading.get_native_id][Constants.CURSOR] = None + + if postgres_dict[threading.get_native_id][Constants.CURSOR] is None: + connection = psycopg2.connect(**config) + cursor = connection.cursor() + cursor.execute(table_or_sql, params) + + postgres_dict[threading.get_native_id][Constants.CONNECTION] = connection + postgres_dict[threading.get_native_id][Constants.CURSOR] = cursor + postgres_dict[threading.get_native_id][Constants.COLUMN_NAMES] = [ + column.name for column in cursor.description + ] + +def postgresql( + table_or_sql: str, + config: mgp.Map, + config_path: str = "", + params: mgp.Nullable[mgp.Any] = None, +) -> mgp.Record(row=mgp.Map): + """ + With migrate.postgresql you can access PostgreSQL and execute queries. The result table is converted into a stream, + and returned rows can be used to create or create graph structures. Config must be at least empty map. + If config_path is passed, every key,value pair from JSON file will overwrite any values in config file. + + :param table_or_sql: Table name or an SQL query + :param config: Connection configuration parameters (as in psycopg2.connect), + :param config_path: Path to the JSON file containing configuration parameters (as in psycopg2.connect) + :param params: Optionally, queries may be parameterized. In that case, `params` provides parameter values + :return: The result table as a stream of rows + """ + global postgres_dict + cursor = postgres_dict[threading.get_native_id][Constants.CURSOR] + column_names = postgres_dict[threading.get_native_id][Constants.COLUMN_NAMES] + + rows = cursor.fetchmany(Constants.BATCH_SIZE) + + return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows] + +def cleanup_migrate_postgresql(): + global postgres_dict + postgres_dict[threading.get_native_id][Constants.CURSOR] = None + postgres_dict[threading.get_native_id][Constants.CONNECTION].close() + postgres_dict[threading.get_native_id][Constants.CONNECTION].commit() + postgres_dict[threading.get_native_id][Constants.CONNECTION] = None + postgres_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None + +mgp.add_batch_read_proc(postgresql, init_migrate_postgresql, cleanup_migrate_postgresql) + + def _query_is_table(table_or_sql: str) -> bool: return len(table_or_sql.split()) == 1 diff --git a/python/requirements.txt b/python/requirements.txt index 45cc459da..993571afa 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -12,5 +12,6 @@ gqlalchemy==1.4.1 mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 +psycopg2==2.9.2 defusedxml==0.7.1 scipy==1.12.0 diff --git a/python/requirements_no_ml.txt b/python/requirements_no_ml.txt index 607ba4b2b..7b32f500e 100644 --- a/python/requirements_no_ml.txt +++ b/python/requirements_no_ml.txt @@ -10,5 +10,6 @@ gqlalchemy==1.4.1 mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 +psycopg2==2.9.2 defusedxml==0.7.1 scipy==1.12.0 From 3fb0b57941acb0afc9adcb85ce1216e0d25aa359 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Thu, 18 Apr 2024 14:46:43 +0200 Subject: [PATCH 2/7] Remove equality --- python/equality_py.py | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 python/equality_py.py diff --git a/python/equality_py.py b/python/equality_py.py deleted file mode 100644 index 6b35e03c1..000000000 --- a/python/equality_py.py +++ /dev/null @@ -1,5 +0,0 @@ -import mgp - -@mgp.function -def is_equal(ctx: mgp.FuncCtx, output: str, input: str): - return output == input From ed1dfca93d3df1265e3853cc0f0b3be0e47b8db1 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Thu, 18 Apr 2024 15:05:23 +0200 Subject: [PATCH 3/7] Update requirements --- python/requirements.txt | 2 +- python/requirements_no_ml.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/requirements.txt b/python/requirements.txt index 993571afa..c64bae23c 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -12,6 +12,6 @@ gqlalchemy==1.4.1 mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 -psycopg2==2.9.2 +psycopg2-binary==2.9.9 defusedxml==0.7.1 scipy==1.12.0 diff --git a/python/requirements_no_ml.txt b/python/requirements_no_ml.txt index 7b32f500e..67a1686f6 100644 --- a/python/requirements_no_ml.txt +++ b/python/requirements_no_ml.txt @@ -10,6 +10,6 @@ gqlalchemy==1.4.1 mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 -psycopg2==2.9.2 +psycopg2-binary==2.9.9 defusedxml==0.7.1 scipy==1.12.0 From 19e71384d510698603e04c844860afcbed861ee5 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Mon, 22 Apr 2024 15:21:04 +0200 Subject: [PATCH 4/7] Format --- python/migrate.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/migrate.py b/python/migrate.py index 6551c28e9..f2884bc39 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -268,6 +268,7 @@ def cleanup_migrate_oracle_db(): # PostgreSQL dictionary to store connections and cursors by thread postgres_dict = {} + def init_migrate_postgresql( table_or_sql: str, config: mgp.Map, @@ -304,6 +305,7 @@ def init_migrate_postgresql( column.name for column in cursor.description ] + def postgresql( table_or_sql: str, config: mgp.Map, @@ -329,6 +331,7 @@ def postgresql( return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows] + def cleanup_migrate_postgresql(): global postgres_dict postgres_dict[threading.get_native_id][Constants.CURSOR] = None @@ -337,6 +340,7 @@ def cleanup_migrate_postgresql(): postgres_dict[threading.get_native_id][Constants.CONNECTION] = None postgres_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None + mgp.add_batch_read_proc(postgresql, init_migrate_postgresql, cleanup_migrate_postgresql) From a90265d2e3119b41231df9b8d3a723a7c884fd35 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Mon, 29 Apr 2024 11:44:52 +0200 Subject: [PATCH 5/7] Add support for Decimal values --- python/migrate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/migrate.py b/python/migrate.py index f2884bc39..a925acec2 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -1,3 +1,4 @@ +from decimal import Decimal import json import mgp import mysql.connector as mysql_connector @@ -364,9 +365,8 @@ def _combine_config(config: mgp.Map, config_path: str) -> Dict[str, Any]: config[key] = value return config - def _name_row_cells(row_cells, column_names) -> Dict[str, Any]: - return dict(map(lambda column, value: (column, value), column_names, row_cells)) + return {column: (value if not isinstance(value, Decimal) else float(value)) for column, value in zip(column_names, row_cells)} def _check_params_type(params: Any, types=(dict, list, tuple)) -> None: From 0228eb36bc8b98cd94ce6a2ec922ca57fb2152ab Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Tue, 28 May 2024 15:12:17 +0200 Subject: [PATCH 6/7] Flake --- python/migrate.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/migrate.py b/python/migrate.py index a925acec2..b225548fa 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -365,8 +365,12 @@ def _combine_config(config: mgp.Map, config_path: str) -> Dict[str, Any]: config[key] = value return config + def _name_row_cells(row_cells, column_names) -> Dict[str, Any]: - return {column: (value if not isinstance(value, Decimal) else float(value)) for column, value in zip(column_names, row_cells)} + return { + column: (value if not isinstance(value, Decimal) else float(value)) + for column, value in zip(column_names, row_cells) + } def _check_params_type(params: Any, types=(dict, list, tuple)) -> None: From 3b41cbdef4924628c5510934917491cb6434c200 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Wed, 29 May 2024 14:06:32 +0200 Subject: [PATCH 7/7] Add reordering of commit and close --- python/migrate.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/migrate.py b/python/migrate.py index b225548fa..81c218b0b 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -86,8 +86,8 @@ def mysql( def cleanup_migrate_mysql(): global mysql_dict mysql_dict[threading.get_native_id][Constants.CURSOR] = None - mysql_dict[threading.get_native_id][Constants.CONNECTION].close() mysql_dict[threading.get_native_id][Constants.CONNECTION].commit() + mysql_dict[threading.get_native_id][Constants.CONNECTION].close() mysql_dict[threading.get_native_id][Constants.CONNECTION] = None mysql_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None @@ -165,8 +165,8 @@ def sql_server( def cleanup_migrate_sql_server(): global sql_server_dict sql_server_dict[threading.get_native_id][Constants.CURSOR] = None - sql_server_dict[threading.get_native_id][Constants.CONNECTION].close() sql_server_dict[threading.get_native_id][Constants.CONNECTION].commit() + sql_server_dict[threading.get_native_id][Constants.CONNECTION].close() sql_server_dict[threading.get_native_id][Constants.CONNECTION] = None sql_server_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None @@ -257,8 +257,8 @@ def oracle_db( def cleanup_migrate_oracle_db(): global oracle_db_dict oracle_db_dict[threading.get_native_id][Constants.CURSOR] = None - oracle_db_dict[threading.get_native_id][Constants.CONNECTION].close() oracle_db_dict[threading.get_native_id][Constants.CONNECTION].commit() + oracle_db_dict[threading.get_native_id][Constants.CONNECTION].close() oracle_db_dict[threading.get_native_id][Constants.CONNECTION] = None oracle_db_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None @@ -336,8 +336,8 @@ def postgresql( def cleanup_migrate_postgresql(): global postgres_dict postgres_dict[threading.get_native_id][Constants.CURSOR] = None - postgres_dict[threading.get_native_id][Constants.CONNECTION].close() postgres_dict[threading.get_native_id][Constants.CONNECTION].commit() + postgres_dict[threading.get_native_id][Constants.CONNECTION].close() postgres_dict[threading.get_native_id][Constants.CONNECTION] = None postgres_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None