Skip to content

Commit

Permalink
Merge pull request #319 from teksi/ensureTriggerAlwaysEnabled
Browse files Browse the repository at this point in the history
Ensure that triggers stay active in case of error
  • Loading branch information
ponceta authored Jul 18, 2024
2 parents 3609bf2 + 8553da4 commit 25054e6
Show file tree
Hide file tree
Showing 16 changed files with 616 additions and 516 deletions.
38 changes: 32 additions & 6 deletions datamodel/app/symbology_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,6 @@ $BODY$
-----------------------------------------------------------------------

CREATE OR REPLACE FUNCTION tww_sys.disable_symbology_triggers() RETURNS VOID AS $$
DECLARE
tbl text;
trig text;
BEGIN
ALTER TABLE tww_od.reach_point DISABLE TRIGGER on_reach_point_update;
ALTER TABLE tww_od.reach DISABLE TRIGGER on_reach_2_change;
Expand All @@ -776,9 +773,6 @@ $$ LANGUAGE plpgsql SECURITY DEFINER;
-----------------------------------------------------------------------

CREATE OR REPLACE FUNCTION tww_sys.enable_symbology_triggers() RETURNS VOID AS $$
DECLARE
tbl text;
trig text;
BEGIN
ALTER TABLE tww_od.reach_point ENABLE TRIGGER on_reach_point_update;
ALTER TABLE tww_od.reach ENABLE TRIGGER on_reach_2_change;
Expand All @@ -795,6 +789,38 @@ BEGIN
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-----------------------------------------------------------------------
-- Check if Symbology Triggers are enabled
-----------------------------------------------------------------------

CREATE OR REPLACE FUNCTION tww_sys.check_symbology_triggers_enabled() RETURNS BOOL AS $$
DECLARE _disabled_count numeric;
BEGIN

SELECT count(*) into _disabled_count FROM pg_trigger WHERE (
tgname='on_reach_point_update' or
tgname='on_reach_2_change' or
tgname='on_reach_1_delete' or
tgname='on_wastewater_structure_update' or
tgname='ws_label_update_by_wastewater_networkelement' or
tgname='on_structure_part_change' or
tgname='on_cover_change' or
tgname='on_wasterwaternode_change' or
tgname='ws_symbology_update_by_reach' or
tgname='ws_symbology_update_by_channel' or
tgname='ws_symbology_update_by_reach_point' or
tgname='calculate_reach_length'
) AND tgenabled = 'D';

IF _disabled_count=0 THEN
return true;
ELSE
return false;
END IF;

END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- only update -> insert and delete are handled by reach trigger
CREATE TRIGGER on_reach_point_update
AFTER UPDATE
Expand Down
7 changes: 0 additions & 7 deletions plugin/teksi_wastewater/interlis/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@

BASE = os.path.dirname(__file__)

PGSERVICE = None # overriden by PG* settings below
PGHOST = os.getenv("PGHOST", None)
PGPORT = os.getenv("PGPORT", None)
PGDATABASE = os.getenv("PGDATABASE", None)
PGUSER = os.getenv("PGUSER", None)
PGPASS = os.getenv("PGPASS", None)

ILIVALIDATOR = os.path.join(BASE, "bin", "ilivalidator-1.11.9", "ilivalidator-1.11.9.jar")

TWW_DEFAULT_PGSERVICE = "pg_tww"
Expand Down
247 changes: 101 additions & 146 deletions plugin/teksi_wastewater/interlis/interlis_importer_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,10 @@
import os
import tempfile

try:
import psycopg

PSYCOPG_VERSION = 3
DEFAULTS_CONN_ARG = {"autocommit": True}
except ImportError:
import psycopg2 as psycopg

PSYCOPG_VERSION = 2
DEFAULTS_CONN_ARG = {}

from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QApplication

from ..utils.database_utils import DatabaseUtils
from . import config
from .gui.interlis_import_selection_dialog import InterlisImportSelectionDialog
from .interlis_model_mapping.interlis_exporter_to_intermediate_schema import (
Expand All @@ -33,13 +23,7 @@
from .interlis_model_mapping.model_tww import ModelTwwSys, ModelTwwVl
from .interlis_model_mapping.model_tww_od import ModelTwwOd
from .utils.ili2db import InterlisTools
from .utils.various import (
CmdException,
LoggingHandlerContext,
get_pgconf_as_psycopg_dsn,
logger,
make_log_path,
)
from .utils.various import CmdException, LoggingHandlerContext, logger, make_log_path


class InterlisImporterExporterStopped(Exception):
Expand Down Expand Up @@ -116,40 +100,52 @@ def interlis_import(self, xtf_file_input, show_selection_dialog=False, logs_next
self._progress_done(35, "Disable symbolgy triggers...")
self._import_disable_symbology_triggers()

# Import from the temporary ili2pg model
self._progress_done(40, "Converting to TEKSI Wastewater...")
tww_session = self._import_from_intermediate_schema(import_model)

if show_selection_dialog:
self._progress_done(90, "Import objects selection...")
import_dialog = InterlisImportSelectionDialog()
import_dialog.init_with_session(tww_session)
QApplication.restoreOverrideCursor()
if import_dialog.exec_() == import_dialog.Rejected:
tww_session.rollback()
tww_session.close()
self._import_enable_symbology_triggers()
raise InterlisImporterExporterStopped()
QApplication.setOverrideCursor(Qt.WaitCursor)
else:
self._progress_done(90, "Commit session...")
tww_session.commit()
tww_session.close()
try:
# Import from the temporary ili2pg model
self._progress_done(40, "Converting to TEKSI Wastewater...")
tww_session = self._import_from_intermediate_schema(import_model)

if show_selection_dialog:
self._progress_done(90, "Import objects selection...")
import_dialog = InterlisImportSelectionDialog()
import_dialog.init_with_session(tww_session)
QApplication.restoreOverrideCursor()
if import_dialog.exec_() == import_dialog.Rejected:
tww_session.rollback()
tww_session.close()
raise InterlisImporterExporterStopped()
QApplication.setOverrideCursor(Qt.WaitCursor)
else:
self._progress_done(90, "Commit session...")
tww_session.commit()
tww_session.close()

# Update main_cover and main_wastewater_node
self._progress_done(95, "Update main cover and refresh materialized views...")
self._import_update_main_cover_and_refresh_mat_views()
# Update main_cover and main_wastewater_node
self._progress_done(95, "Update main cover and refresh materialized views...")
self._import_update_main_cover_and_refresh_mat_views()

# Validate subclasses after import
self._check_subclass_counts()
# Validate subclasses after import
self._check_subclass_counts()

# Update organisations
self._progress_done(96, "Set organisations filter...")
self._import_manage_organisations()
# Update organisations
self._progress_done(96, "Set organisations filter...")
self._import_manage_organisations()

# Reenable symbology triggers
self._progress_done(97, "Reenable symbology triggers...")
self._import_enable_symbology_triggers()
# Reenable symbology triggers
self._progress_done(97, "Reenable symbology triggers...")
self._import_enable_symbology_triggers()

except Exception as exception:
# Make sure to re-enable triggers in case an exception occourred
try:
self._import_enable_symbology_triggers()
except Exception as enable_trigger_exception:
logger.error(
f"Symbology triggers couldn't be re-enabled because an exception occourred: '{enable_trigger_exception}'"
)

# Raise the original exception for further error handling
raise exception

self._progress_done(100)
logger.info("INTERLIS import finished.")
Expand Down Expand Up @@ -267,67 +263,30 @@ def _import_from_intermediate_schema(self, import_model):
return interlisImporterToIntermediateSchema.session_tww

def _import_manage_organisations(self):
connection = psycopg.connect(get_pgconf_as_psycopg_dsn(), **DEFAULTS_CONN_ARG)
if PSYCOPG_VERSION == 2:
connection.set_session(autocommit=True)
cursor = connection.cursor()

logger.info("Update organisation tww_active")
cursor.execute("SELECT tww_app.set_organisations_active();")

connection.commit()
connection.close()
DatabaseUtils.execute("SELECT tww_app.set_organisations_active();")

def _import_update_main_cover_and_refresh_mat_views(self):
connection = psycopg.connect(get_pgconf_as_psycopg_dsn(), **DEFAULTS_CONN_ARG)
if PSYCOPG_VERSION == 2:
connection.set_session(autocommit=True)
cursor = connection.cursor()

logger.info("Update wastewater structure fk_main_cover")
cursor.execute("SELECT tww_app.wastewater_structure_update_fk_main_cover('', True);")
with DatabaseUtils.PsycopgConnection() as connection:
cursor = connection.cursor()

logger.info("Update wastewater structure fk_main_wastewater_node")
cursor.execute(
"SELECT tww_app.wastewater_structure_update_fk_main_wastewater_node('', True);"
)
logger.info("Update wastewater structure fk_main_cover")
cursor.execute("SELECT tww_app.wastewater_structure_update_fk_main_cover('', True);")

logger.info("Refresh materialized views")
cursor.execute("SELECT tww_app.network_refresh_network_simple();")
logger.info("Update wastewater structure fk_main_wastewater_node")
cursor.execute(
"SELECT tww_app.wastewater_structure_update_fk_main_wastewater_node('', True);"
)

connection.commit()
connection.close()
logger.info("Refresh materialized views")
cursor.execute("SELECT tww_app.network_refresh_network_simple();")

def _import_disable_symbology_triggers(self):
connection = psycopg.connect(get_pgconf_as_psycopg_dsn(), **DEFAULTS_CONN_ARG)
if PSYCOPG_VERSION == 2:
connection.set_session(autocommit=True)
cursor = connection.cursor()

logger.info("Disable symbology triggers")
cursor.execute("SELECT tww_sys.disable_symbology_triggers();")

connection.commit()
connection.close()
DatabaseUtils.disable_symbology_triggers()

def _import_enable_symbology_triggers(self):
connection = psycopg.connect(get_pgconf_as_psycopg_dsn(), **DEFAULTS_CONN_ARG)
if PSYCOPG_VERSION == 2:
connection.set_session(autocommit=True)
cursor = connection.cursor()

logger.info("Enable symbology triggers")
cursor.execute("SELECT tww_sys.enable_symbology_triggers();")

logger.info("update_wastewater_node_symbology for all datasets - please be patient")
cursor.execute("SELECT tww_app.update_wastewater_node_symbology(NULL, True);")
logger.info("update_wastewater_structure_label for all datasets - please be patient")
cursor.execute("SELECT tww_app.update_wastewater_structure_label(NULL, True);")
logger.info("update_wn_symbology_by_overflow for all datasets - please be patient")
cursor.execute("SELECT tww_app.update_wn_symbology_by_overflow(NULL, True);")

connection.commit()
connection.close()
DatabaseUtils.enable_symbology_triggers()
DatabaseUtils.update_symbology()

def _export_labels_file(
self,
Expand Down Expand Up @@ -470,31 +429,31 @@ def _export_xtf_files(self, file_name_base, export_models):
def _clear_ili_schema(self, recreate_schema=False):
logger.info("CONNECTING TO DATABASE...")

connection = psycopg.connect(get_pgconf_as_psycopg_dsn(), **DEFAULTS_CONN_ARG)
if PSYCOPG_VERSION == 2:
connection.set_session(autocommit=True)
cursor = connection.cursor()
with DatabaseUtils.PsycopgConnection() as connection:
cursor = connection.cursor()

if not recreate_schema:
# If the schema already exists, we just truncate all tables
cursor.execute(
f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{config.ABWASSER_SCHEMA}';"
)
if cursor.rowcount > 0:
logger.info(f"Schema {config.ABWASSER_SCHEMA} already exists, we truncate instead")
if not recreate_schema:
# If the schema already exists, we just truncate all tables
cursor.execute(
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{config.ABWASSER_SCHEMA}';"
f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{config.ABWASSER_SCHEMA}';"
)
for row in cursor.fetchall():
cursor.execute(f"TRUNCATE TABLE {config.ABWASSER_SCHEMA}.{row[0]} CASCADE;")
return
if cursor.rowcount > 0:
logger.info(
f"Schema {config.ABWASSER_SCHEMA} already exists, we truncate instead"
)
cursor.execute(
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{config.ABWASSER_SCHEMA}';"
)
for row in cursor.fetchall():
cursor.execute(
f"TRUNCATE TABLE {config.ABWASSER_SCHEMA}.{row[0]} CASCADE;"
)
return

logger.info(f"DROPPING THE SCHEMA {config.ABWASSER_SCHEMA}...")
cursor.execute(f'DROP SCHEMA IF EXISTS "{config.ABWASSER_SCHEMA}" CASCADE ;')
logger.info(f"CREATING THE SCHEMA {config.ABWASSER_SCHEMA}...")
cursor.execute(f'CREATE SCHEMA "{config.ABWASSER_SCHEMA}";')
connection.commit()
connection.close()
logger.info(f"DROPPING THE SCHEMA {config.ABWASSER_SCHEMA}...")
cursor.execute(f'DROP SCHEMA IF EXISTS "{config.ABWASSER_SCHEMA}" CASCADE ;')
logger.info(f"CREATING THE SCHEMA {config.ABWASSER_SCHEMA}...")
cursor.execute(f'CREATE SCHEMA "{config.ABWASSER_SCHEMA}";')

def _create_ili_schema(
self, models, ext_columns_no_constraints=False, create_basket_col=False
Expand Down Expand Up @@ -576,32 +535,28 @@ def _check_subclass_count(self, schema_name, parent_name, child_list):
logger.info(f"INTEGRITY CHECK {parent_name} subclass data...")
logger.info("CONNECTING TO DATABASE...")

connection = psycopg.connect(get_pgconf_as_psycopg_dsn(), **DEFAULTS_CONN_ARG)
if PSYCOPG_VERSION == 2:
connection.set_session(autocommit=True)
cursor = connection.cursor()

cursor.execute(f"SELECT obj_id FROM {schema_name}.{parent_name};")
parent_rows = cursor.fetchall()
if len(parent_rows) > 0:
parent_count = len(parent_rows)
logger.info(f"Number of {parent_name} datasets: {parent_count}")
for child_name in child_list:
cursor.execute(f"SELECT obj_id FROM {schema_name}.{child_name};")
child_rows = cursor.fetchall()
logger.info(f"Number of {child_name} datasets: {len(child_rows)}")
parent_count = parent_count - len(child_rows)
connection.commit()
connection.close()

if parent_count == 0:
logger.info(
f"OK: number of subclass elements of class {parent_name} OK in schema {schema_name}!"
)
else:
logger.error(
f"ERROR: number of subclass elements of {parent_name} NOT CORRECT in schema {schema_name}: checksum = {parent_count} (positive number means missing entries, negative means too many subclass entries)"
)
with DatabaseUtils.PsycopgConnection() as connection:
cursor = connection.cursor()

cursor.execute(f"SELECT obj_id FROM {schema_name}.{parent_name};")
parent_rows = cursor.fetchall()
if len(parent_rows) > 0:
parent_count = len(parent_rows)
logger.info(f"Number of {parent_name} datasets: {parent_count}")
for child_name in child_list:
cursor.execute(f"SELECT obj_id FROM {schema_name}.{child_name};")
child_rows = cursor.fetchall()
logger.info(f"Number of {child_name} datasets: {len(child_rows)}")
parent_count = parent_count - len(child_rows)

if parent_count == 0:
logger.info(
f"OK: number of subclass elements of class {parent_name} OK in schema {schema_name}!"
)
else:
logger.error(
f"ERROR: number of subclass elements of {parent_name} NOT CORRECT in schema {schema_name}: checksum = {parent_count} (positive number means missing entries, negative means too many subclass entries)"
)

def _init_model_classes(self, model):
ModelInterlis = ModelInterlisSia405Abwasser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from sqlalchemy.orm import Session
from sqlalchemy.sql import text

from ...utils.plugin_utils import logger
from .. import config, utils
from ..utils.various import logger


class InterlisExporterToIntermediateSchemaError(Exception):
Expand Down
Loading

0 comments on commit 25054e6

Please sign in to comment.