From bce56de9554cc6985ae734836dd118ea6db173cf Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 30 Jan 2024 17:45:57 -0500 Subject: [PATCH] feat: patition xapi_events_all table --- ...0032_partition_tables_by_year_and_month.py | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0032_partition_tables_by_year_and_month.py diff --git a/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0032_partition_tables_by_year_and_month.py b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0032_partition_tables_by_year_and_month.py new file mode 100644 index 000000000..d6e2bd355 --- /dev/null +++ b/tutoraspects/templates/aspects/apps/aspects/migrations/alembic/versions/0032_partition_tables_by_year_and_month.py @@ -0,0 +1,100 @@ +""" +Partition the event_sink.user_profile table + +.. pii: Stores Open edX user profile data. +.. pii_types: user_id, name, username, location, phone_number, email_address, birth_date, biography, gender +.. pii_retirement: local_api, consumer_api +""" +from alembic import op + + +revision = "0032" +down_revision = "0031" +branch_labels = None +depends_on = None +on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else "" +engine = "ReplicatedReplacingMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "ReplacingMergeTree" + +old_user_profile_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}" + +def upgrade(): + # Partition event_sink.user_profile table + # 1. Rename old table + op.execute( + f""" + RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} + TO {old_user_profile_table} + {on_cluster} + """ + ) + # 2. Create partitioned table from old data + op.execute( + f""" + CREATE TABLE IF NOT EXISTS {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} + {on_cluster} + ( + event_id UUID NOT NULL, + emission_time DateTime64(6) NOT NULL, + event String NOT NULL + ) ENGINE {engine} + ORDER BY (emission_time, event_id) + PARTITION BY toYYYYMM(emission_time) + PRIMARY KEY (emission_time, event_id); + """ + ) + # 3. Insert data from the old table into the new one + op.execute( + f""" + INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} + SELECT * FROM {old_user_profile_table} + """ + ) + # 4. Drop the old table + op.execute( + f""" + DROP TABLE {old_user_profile_table} + {on_cluster} + """ + ) + + +def downgrade(): + # Un-partition the event_sink.user_profile table + # 1a. Rename old table + op.execute( + f""" + RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} + TO {old_user_profile_table} + {on_cluster} + """ + ) + + # 2. Create un-partitioned table from old data + op.execute( + f""" + CREATE OR REPLACE TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} + {on_cluster} + ( + event_id UUID NOT NULL, + emission_time DateTime64(6) NOT NULL, + event String NOT NULL + ) ENGINE {engine} + ORDER BY (emission_time, event_id) + PRIMARY KEY (emission_time, event_id); + """ + ) + # 3. Insert into new table from old one + op.execute( + f""" + INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} + SELECT * FROM {old_user_profile_table} + """ + + ) + # 4. Drop the old table + op.execute( + f""" + DROP TABLE {old_user_profile_table} + {on_cluster} + """ + )