Skip to content

Commit

Permalink
Reserved keywords test and fix (#35)
Browse files Browse the repository at this point in the history
* Reserved keywords test and fix

* Properly quote column names, need to switch to sql alchemy native

* Lint

* Temp table added back

* More lints
  • Loading branch information
visch authored Nov 20, 2022
1 parent c034a79 commit 8a2b849
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 124 deletions.
148 changes: 31 additions & 117 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ license = "Apache 2.0"
[tool.poetry.dependencies]
python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.13.0"
singer-sdk = "^0.14.0"
psycopg2-binary = "2.9.3"

[tool.poetry.dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sqlalchemy
from singer_sdk import SQLConnector
from singer_sdk import typing as th
from sqlalchemy.dialects.postgresql import ARRAY, JSONB, BIGINT
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, JSONB
from sqlalchemy.types import TIMESTAMP


Expand Down
28 changes: 23 additions & 5 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ def __init__(self, *args, **kwargs):
self.temp_table_name = self.generate_temp_table_name()
super().__init__(*args, **kwargs)

def setup(self) -> None:
"""Set up Sink.
This method is called on Sink creation, and creates the required Schema and
Table entities in the target database.
"""
if self.schema_name:
self.connector.prepare_schema(self.schema_name)
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
primary_keys=self.key_properties,
as_temp_table=False,
)

def process_batch(self, context: dict) -> None:
"""Process a batch with the given batch context.
Expand Down Expand Up @@ -95,9 +110,9 @@ def merge_upsert_from_table(

# INSERT
join_condition = " and ".join(
[f"temp.{key} = target.{key}" for key in join_keys]
[f'temp."{key}" = target."{key}"' for key in join_keys]
)
where_condition = " and ".join([f"target.{key} is null" for key in join_keys])
where_condition = " and ".join([f'target."{key}" is null' for key in join_keys])

insert_sql = f"""
INSERT INTO {to_table_name}
Expand All @@ -112,7 +127,7 @@ def merge_upsert_from_table(
# UPDATE
columns = ", ".join(
[
f"{column_name}=temp.{column_name}"
f'"{column_name}"=temp."{column_name}"'
for column_name in self.schema["properties"].keys()
]
)
Expand Down Expand Up @@ -171,8 +186,7 @@ def column_representation(
) -> List[Column]:
"""Returns a sql alchemy table representation for the current schema."""
columns: list[Column] = []
conformed_properties = self.conform_schema(schema)["properties"]
for property_name, property_jsonschema in conformed_properties.items():
for property_name, property_jsonschema in schema["properties"].items():
columns.append(
Column(
property_name,
Expand All @@ -198,3 +212,7 @@ def generate_insert_statement(
metadata = MetaData()
table = Table(full_table_name, metadata, *columns)
return insert(table)

def conform_name(self, name: str, object_type: Optional[str] = None) -> str:
"""Conforming names of tables, schemas, column names."""
return name
3 changes: 3 additions & 0 deletions target_postgres/tests/data_files/reserved_keywords.singer

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions target_postgres/tests/test_standard_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,11 @@ def test_missing_value(postgres_target):
def test_large_int(postgres_target):
file_name = "large_int.singer"
singer_file_to_target(file_name, postgres_target)


def test_reserved_keywords(postgres_target):
"""Postgres has a number of resereved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html.
The target should work regradless of the column names"""
file_name = "reserved_keywords.singer"
singer_file_to_target(file_name, postgres_target)

0 comments on commit 8a2b849

Please sign in to comment.