Skip to content

Commit 10b61d2

Browse files
pnadolny13pre-commit-ci[bot]edgarrmondragon
authored
feat: Standard configurable load methods (#1893)
* initial implementation of standard load methods, sql connector implementation * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * enum to values * fix enum comparisons * adds test for sqlite overwrite load method * Address issues * drop table instead of truncating * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Edgar Ramírez Mondragón <edgar@meltano.com>
1 parent 50c4725 commit 10b61d2

File tree

6 files changed

+95
-0
lines changed

6 files changed

+95
-0
lines changed

cookiecutter/target-template/{{cookiecutter.target_id}}/{{cookiecutter.library_name}}/sinks.py

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class {{ cookiecutter.destination_name }}Connector(SQLConnector):
3535
allow_column_rename: bool = True # Whether RENAME COLUMN is supported.
3636
allow_column_alter: bool = False # Whether altering column types is supported.
3737
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
38+
allow_overwrite: bool = False # Whether overwrite load method is supported.
3839
allow_temp_tables: bool = True # Whether temp tables are supported.
3940

4041
def get_sqlalchemy_url(self, config: dict) -> str:

samples/sample_target_sqlite/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class SQLiteConnector(SQLConnector):
1919
allow_temp_tables = False
2020
allow_column_alter = False
2121
allow_merge_upsert = True
22+
allow_overwrite: bool = True
2223

2324
def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str:
2425
"""Generates a SQLAlchemy URL for SQLite."""

singer_sdk/connectors/sql.py

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from singer_sdk import typing as th
1919
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
2020
from singer_sdk.exceptions import ConfigValidationError
21+
from singer_sdk.helpers.capabilities import TargetLoadMethods
2122

2223
if t.TYPE_CHECKING:
2324
from sqlalchemy.engine.reflection import Inspector
@@ -40,6 +41,7 @@ class SQLConnector:
4041
allow_column_rename: bool = True # Whether RENAME COLUMN is supported.
4142
allow_column_alter: bool = False # Whether altering column types is supported.
4243
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
44+
allow_overwrite: bool = False # Whether overwrite load method is supported.
4345
allow_temp_tables: bool = True # Whether temp tables are supported.
4446
_cached_engine: Engine | None = None
4547

@@ -775,6 +777,16 @@ def prepare_table(
775777
as_temp_table=as_temp_table,
776778
)
777779
return
780+
if self.config["load_method"] == TargetLoadMethods.OVERWRITE:
781+
self.get_table(full_table_name=full_table_name).drop(self._engine)
782+
self.create_empty_table(
783+
full_table_name=full_table_name,
784+
schema=schema,
785+
primary_keys=primary_keys,
786+
partition_keys=partition_keys,
787+
as_temp_table=as_temp_table,
788+
)
789+
return
778790

779791
for property_name, property_def in schema["properties"].items():
780792
self.prepare_column(

singer_sdk/helpers/capabilities.py

+34
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,40 @@
108108
).to_dict()
109109

110110

111+
class TargetLoadMethods(str, Enum):
112+
"""Target-specific capabilities."""
113+
114+
# always write all input records whether that records already exists or not
115+
APPEND_ONLY = "append-only"
116+
117+
# update existing records and insert new records
118+
UPSERT = "upsert"
119+
120+
# delete all existing records and insert all input records
121+
OVERWRITE = "overwrite"
122+
123+
124+
TARGET_LOAD_METHOD_CONFIG = PropertiesList(
125+
Property(
126+
"load_method",
127+
StringType(),
128+
description=(
129+
"The method to use when loading data into the destination. "
130+
"`append-only` will always write all input records whether that records "
131+
"already exists or not. `upsert` will update existing records and insert "
132+
"new records. `overwrite` will delete all existing records and insert all "
133+
"input records."
134+
),
135+
allowed_values=[
136+
TargetLoadMethods.APPEND_ONLY,
137+
TargetLoadMethods.UPSERT,
138+
TargetLoadMethods.OVERWRITE,
139+
],
140+
default=TargetLoadMethods.APPEND_ONLY,
141+
),
142+
).to_dict()
143+
144+
111145
class DeprecatedEnum(Enum):
112146
"""Base class for capabilities enumeration."""
113147

singer_sdk/target_base.py

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from singer_sdk.helpers.capabilities import (
2020
ADD_RECORD_METADATA_CONFIG,
2121
BATCH_CONFIG,
22+
TARGET_LOAD_METHOD_CONFIG,
2223
TARGET_SCHEMA_CONFIG,
2324
CapabilitiesEnum,
2425
PluginCapabilities,
@@ -597,6 +598,7 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
597598
target_jsonschema["properties"][k] = v
598599

599600
_merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema)
601+
_merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema)
600602

601603
capabilities = cls.capabilities
602604

tests/samples/test_target_sqlite.py

+45
Original file line numberDiff line numberDiff line change
@@ -508,3 +508,48 @@ def test_hostile_to_sqlite(
508508
"hname_starts_with_number",
509509
"name_with_emoji_",
510510
}
511+
512+
513+
def test_overwrite_load_method(
514+
sqlite_target_test_config: dict,
515+
):
516+
sqlite_target_test_config["load_method"] = "overwrite"
517+
target = SQLiteTarget(config=sqlite_target_test_config)
518+
test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}"
519+
schema_msg = {
520+
"type": "SCHEMA",
521+
"stream": test_tbl,
522+
"schema": {
523+
"type": "object",
524+
"properties": {"col_a": th.StringType().to_dict()},
525+
},
526+
}
527+
528+
tap_output_a = "\n".join(
529+
json.dumps(msg)
530+
for msg in [
531+
schema_msg,
532+
{"type": "RECORD", "stream": test_tbl, "record": {"col_a": "123"}},
533+
]
534+
)
535+
# Assert
536+
db = sqlite3.connect(sqlite_target_test_config["path_to_db"])
537+
cursor = db.cursor()
538+
539+
target_sync_test(target, input=StringIO(tap_output_a), finalize=True)
540+
cursor.execute(f"SELECT col_a FROM {test_tbl} ;") # noqa: S608
541+
records = [res[0] for res in cursor.fetchall()]
542+
assert records == ["123"]
543+
544+
tap_output_b = "\n".join(
545+
json.dumps(msg)
546+
for msg in [
547+
schema_msg,
548+
{"type": "RECORD", "stream": test_tbl, "record": {"col_a": "456"}},
549+
]
550+
)
551+
target = SQLiteTarget(config=sqlite_target_test_config)
552+
target_sync_test(target, input=StringIO(tap_output_b), finalize=True)
553+
cursor.execute(f"SELECT col_a FROM {test_tbl} ;") # noqa: S608
554+
records = [res[0] for res in cursor.fetchall()]
555+
assert records == ["456"]

0 commit comments

Comments
 (0)