Skip to content

Commit

Permalink
Fix handling of function calls in materialized view queries (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
loren authored Jun 1, 2023
1 parent ba05ff3 commit 752171f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sinker"
version = "0.1.0"
version = "0.1.1"
description = "Synchronize Postgres to Elasticsearch"
authors = ["Loren Siebert <loren@paradigm.xyz>"]
license = "MIT/Apache-2.0"
Expand Down
28 changes: 13 additions & 15 deletions src/sinker/sinker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import logging
import os
import re
from typing import Iterable, Dict, Any

import psycopg
Expand All @@ -18,11 +17,11 @@
PGCHUNK_SIZE,
SCHEMA_TABLE_DELIMITER,
)
from .utils import generate_schema_tables

logger = logging.getLogger(__name__)

BACKFILL_CURSOR_NAME = "backfill"
TABLE_RE = re.compile(r"from\s\"?(\S+)\b", re.I)


class Sinker:
Expand Down Expand Up @@ -106,27 +105,26 @@ def setup_pg(self):
plpgsql: str = f"{schema_view_name}_fn"
create_function: str = q.CREATE_FUNCTION.format(plpgsql, SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name)
ddl_list.append(create_function)
schema_tables: list[Any] = TABLE_RE.findall(view_select_query)
for schema_table in schema_tables:
for schema_table in generate_schema_tables(view_select_query):
schema, _, table = schema_table.rpartition(SCHEMA_TABLE_DELIMITER)
schema = schema or DEFAULT_SCHEMA
trigger_name: str = f"{SINKER_SCHEMA}_{self.view}_{schema}_{table}"
create_trigger: str = q.CREATE_TRIGGER.format(trigger_name, schema, table, plpgsql)
ddl_list.append(create_trigger)
# The last table is the top-level table that gets DELETE events with an ID in the replication slot.
# The materialized views do not contain the ID of the doc being deleted,
# so we'll use this table's delete events as a proxy.
# lsn,xid,data
# 0/24EDA4D8,17393,BEGIN 17393
# 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'"
# 0/24EDC1B0,17393,COMMIT 17393
# 0/24EDC228,17394,BEGIN 17394
# 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data)
# 0/24EF4718,17394,COMMIT 17394
self.parent_table = schema_table
create_todo_entry: str = q.CREATE_TODO_ENTRY.format(SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name)
ddl_list.append(create_todo_entry)
psycopg.connect(autocommit=True).execute("; ".join(ddl_list))
# The last table is the top-level table that gets DELETE events with an ID in the replication slot.
# The materialized views do not contain the ID of the doc being deleted,
# so we'll use this table's delete events as a proxy.
# lsn,xid,data
# 0/24EDA4D8,17393,BEGIN 17393
# 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'"
# 0/24EDC1B0,17393,COMMIT 17393
# 0/24EDC228,17394,BEGIN 17394
# 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data)
# 0/24EF4718,17394,COMMIT 17394
self.parent_table = schema_tables[-1]

def refresh_view(self) -> str:
logger.info(f"Refreshing the {self.view} materialized view")
Expand Down
16 changes: 16 additions & 0 deletions src/sinker/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import re

from typing import Iterable

TABLE_RE = re.compile(r"from\s\"?(\S+)\b", re.I)


def generate_schema_tables(view_select_query: str) -> Iterable[str]:
"""
Given a view select query, return a list of tables that are referenced in the query.
Skip anything that looks like a function call.
:param view_select_query: The select query from the view
"""
for table_candidate in TABLE_RE.findall(view_select_query):
if "(" not in table_candidate:
yield table_candidate
12 changes: 12 additions & 0 deletions tests/test_generate_schema_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from sinker.utils import generate_schema_tables


def test_generate_schema_tables():
view_select_query = """select id,
json_build_object(
'name', "name",
'emailDomains',(select array_agg(split_part(email, '@', 2)) FROM unnest(emails) as email),
) as "person"
from "person"
"""
assert list(generate_schema_tables(view_select_query)) == ["person"]

0 comments on commit 752171f

Please sign in to comment.