Skip to content

Commit

Permalink
Merge pull request #586 from openedx/cag/partition-tables
Browse files Browse the repository at this point in the history
feat: partition xapi_events_all table
  • Loading branch information
Ian2012 authored Feb 7, 2024
2 parents cfb97ba + bce56de commit cc6cfcd
Showing 1 changed file with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Partition the event_sink.user_profile table
.. pii: Stores Open edX user profile data.
.. pii_types: user_id, name, username, location, phone_number, email_address, birth_date, biography, gender
.. pii_retirement: local_api, consumer_api
"""
from alembic import op


revision = "0032"
down_revision = "0031"
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""
engine = "ReplicatedReplacingMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "ReplacingMergeTree"

old_user_profile_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}"

def upgrade():
# Partition event_sink.user_profile table
# 1. Rename old table
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
{on_cluster}
"""
)
# 2. Create partitioned table from old data
op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
{on_cluster}
(
event_id UUID NOT NULL,
emission_time DateTime64(6) NOT NULL,
event String NOT NULL
) ENGINE {engine}
ORDER BY (emission_time, event_id)
PARTITION BY toYYYYMM(emission_time)
PRIMARY KEY (emission_time, event_id);
"""
)
# 3. Insert data from the old table into the new one
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
"""
)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
{on_cluster}
"""
)


def downgrade():
# Un-partition the event_sink.user_profile table
# 1a. Rename old table
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
{on_cluster}
"""
)

# 2. Create un-partitioned table from old data
op.execute(
f"""
CREATE OR REPLACE TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
{on_cluster}
(
event_id UUID NOT NULL,
emission_time DateTime64(6) NOT NULL,
event String NOT NULL
) ENGINE {engine}
ORDER BY (emission_time, event_id)
PRIMARY KEY (emission_time, event_id);
"""
)
# 3. Insert into new table from old one
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
"""

)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
{on_cluster}
"""
)

0 comments on commit cc6cfcd

Please sign in to comment.