Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle replication key not found in stream schema #1927

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
006d7db
Handle replication key not found in stream schema
mjsqu Aug 27, 2023
4ddf1c8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 27, 2023
89ddbb1
Add missing Raises: to is_timestamp_replication_key docstring
mjsqu Aug 27, 2023
be1ef20
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 27, 2023
67c8122
Split raise docstring over to new line for is_timestamp_replication_k…
mjsqu Aug 27, 2023
924a50d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 27, 2023
516021a
Correct indentation for raises docstring
mjsqu Aug 27, 2023
7db509e
Add test_stream_invalid_replication_key
mjsqu Aug 28, 2023
90a8c43
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
9e9f77e
Import InvalidReplicationKeyException name
mjsqu Aug 28, 2023
532b5af
pre-commit check updates - line length and 'useless' expression
mjsqu Aug 28, 2023
8ccdc87
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
ff7de54
Rename 'unused' variable with underscore prefix - variable is used to…
mjsqu Aug 28, 2023
a863374
Merge branch 'main' into bugfix/improve_err_msg_replication_key_not_p…
edgarrmondragon Aug 28, 2023
34906b5
Improve error message
mjsqu Aug 28, 2023
4415bcc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
8b1da60
Update exception message in invalid replication key test
mjsqu Aug 28, 2023
0ca1e27
Make linter happy
edgarrmondragon Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class FatalAPIError(Exception):
"""Exception raised when a failed request should not be considered retriable."""


class InvalidReplicationKeyException(Exception):
"""Exception to raise if the replication key is not in the stream properties."""


class InvalidStreamSortException(Exception):
"""Exception to raise if sorting errors are found while syncing the records."""

Expand Down
8 changes: 8 additions & 0 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from singer_sdk.exceptions import (
AbortedSyncFailedException,
AbortedSyncPausedException,
InvalidReplicationKeyException,
InvalidStreamSortException,
MaxRecordsLimitException,
)
Expand Down Expand Up @@ -211,10 +212,17 @@ def is_timestamp_replication_key(self) -> bool:

Returns:
True if the stream uses a timestamp-based replication key.

Raises:
InvalidReplicationKeyException: If the schema does not contain the
replication key.
"""
if not self.replication_key:
return False
type_dict = self.schema.get("properties", {}).get(self.replication_key)
if type_dict is None:
msg = f"Field '{self.replication_key}' is not in schema for stream '{self.name}'" # noqa: E501
raise InvalidReplicationKeyException(msg)
return is_datetime_type(type_dict)

def get_starting_replication_key_value(
Expand Down
21 changes: 21 additions & 0 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import requests

from singer_sdk._singerlib import Catalog, MetadataMapping
from singer_sdk.exceptions import (
InvalidReplicationKeyException,
)
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers.jsonpath import _compile_jsonpath, extract_jsonpath
from singer_sdk.pagination import first
Expand Down Expand Up @@ -275,6 +278,24 @@ def test_stream_starting_timestamp(
assert get_starting_value(None) == expected_starting_value


def test_stream_invalid_replication_key(tap: SimpleTestTap):
"""Validate an exception is raised if replication_key not in schema."""

class InvalidReplicationKeyStream(SimpleTestStream):
replication_key = "INVALID"

stream = InvalidReplicationKeyStream(tap)

with pytest.raises(
InvalidReplicationKeyException,
match=(
f"Field '{stream.replication_key}' is not in schema for stream "
f"'{stream.name}'"
),
):
_check = stream.is_timestamp_replication_key


@pytest.mark.parametrize(
"path,content,result",
[
Expand Down