Skip to content

Commit b1b97b0

Browse files
mjsqupre-commit-ci[bot]edgarrmondragon
authored
fix: Handle replication key not found in stream schema (#1927)
* Handle replication key not found in stream schema * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add missing Raises: to is_timestamp_replication_key docstring * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Split raise docstring over to new line for is_timestamp_replication_key docstring * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Correct indentation for raises docstring * Add test_stream_invalid_replication_key * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Import InvalidReplicationKeyException name * pre-commit check updates - line length and 'useless' expression * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Rename 'unused' variable with underscore prefix - variable is used to intentionally raise and exception * Improve error message Co-authored-by: Edgar R. M. <edgarrm358@gmail.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update exception message in invalid replication key test Co-authored-by: Edgar R. M. <edgarrm358@gmail.com> * Make linter happy --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Edgar R. M <edgar@meltano.com> Co-authored-by: Edgar R. M. <edgarrm358@gmail.com>
1 parent 4eac6f4 commit b1b97b0

File tree

3 files changed

+33
-0
lines changed

3 files changed

+33
-0
lines changed

singer_sdk/exceptions.py

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ class FatalAPIError(Exception):
1717
"""Exception raised when a failed request should not be considered retriable."""
1818

1919

20+
class InvalidReplicationKeyException(Exception):
21+
"""Exception to raise if the replication key is not in the stream properties."""
22+
23+
2024
class InvalidStreamSortException(Exception):
2125
"""Exception to raise if sorting errors are found while syncing the records."""
2226

singer_sdk/streams/core.py

+8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from singer_sdk.exceptions import (
2020
AbortedSyncFailedException,
2121
AbortedSyncPausedException,
22+
InvalidReplicationKeyException,
2223
InvalidStreamSortException,
2324
MaxRecordsLimitException,
2425
)
@@ -211,10 +212,17 @@ def is_timestamp_replication_key(self) -> bool:
211212
212213
Returns:
213214
True if the stream uses a timestamp-based replication key.
215+
216+
Raises:
217+
InvalidReplicationKeyException: If the schema does not contain the
218+
replication key.
214219
"""
215220
if not self.replication_key:
216221
return False
217222
type_dict = self.schema.get("properties", {}).get(self.replication_key)
223+
if type_dict is None:
224+
msg = f"Field '{self.replication_key}' is not in schema for stream '{self.name}'" # noqa: E501
225+
raise InvalidReplicationKeyException(msg)
218226
return is_datetime_type(type_dict)
219227

220228
def get_starting_replication_key_value(

tests/core/test_streams.py

+21
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
import requests
1111

1212
from singer_sdk._singerlib import Catalog, MetadataMapping
13+
from singer_sdk.exceptions import (
14+
InvalidReplicationKeyException,
15+
)
1316
from singer_sdk.helpers._classproperty import classproperty
1417
from singer_sdk.helpers.jsonpath import _compile_jsonpath, extract_jsonpath
1518
from singer_sdk.pagination import first
@@ -275,6 +278,24 @@ def test_stream_starting_timestamp(
275278
assert get_starting_value(None) == expected_starting_value
276279

277280

281+
def test_stream_invalid_replication_key(tap: SimpleTestTap):
282+
"""Validate an exception is raised if replication_key not in schema."""
283+
284+
class InvalidReplicationKeyStream(SimpleTestStream):
285+
replication_key = "INVALID"
286+
287+
stream = InvalidReplicationKeyStream(tap)
288+
289+
with pytest.raises(
290+
InvalidReplicationKeyException,
291+
match=(
292+
f"Field '{stream.replication_key}' is not in schema for stream "
293+
f"'{stream.name}'"
294+
),
295+
):
296+
_check = stream.is_timestamp_replication_key
297+
298+
278299
@pytest.mark.parametrize(
279300
"path,content,result",
280301
[

0 commit comments

Comments
 (0)