Skip to content

Commit

Permalink
Source Notion: skip streams with invalid_start_cursor error (#27062)
Browse files Browse the repository at this point in the history
* Source Notion: skip streams with invalid_start_cursor error

* Source Notion: update docs

* Source Stripe: bump version
  • Loading branch information
artem1205 authored Jun 7, 2023
1 parent 83b968a commit f78e2e5
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-notion/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_notion ./source_notion
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.8
LABEL io.airbyte.version=1.0.9
LABEL io.airbyte.name=airbyte/source-notion
8 changes: 4 additions & 4 deletions airbyte-integrations/connectors/source-notion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ We split dependencies between two groups, dependencies that are:
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
2. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
3. Create a Pull Request.
4. Pat yourself on the back for being an awesome contributor.
5. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 6e00b415-b02e-4160-bf02-58176a0ae687
dockerImageTag: 1.0.8
dockerImageTag: 1.0.9
dockerRepository: airbyte/source-notion
githubIssueLabel: source-notion
icon: notion.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

import pydantic
import requests
from airbyte_cdk.models import FailureType, SyncMode
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.exceptions import UserDefinedBackoffException
from airbyte_cdk.utils import AirbyteTracedException

from .utils import transform_properties

Expand All @@ -37,7 +36,8 @@ def __init__(self, config: Mapping[str, Any], **kwargs):
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

def check_invalid_start_cursor(self, response: requests.Response):
@staticmethod
def check_invalid_start_cursor(response: requests.Response):
if response.status_code == 400:
message = response.json().get("message", "")
if message.startswith("The start_cursor provided is invalid: "):
Expand Down Expand Up @@ -147,7 +147,8 @@ def read_records(self, sync_mode: SyncMode, stream_state: Mapping[str, Any] = No
except UserDefinedBackoffException as e:
message = self.check_invalid_start_cursor(e.response)
if message:
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error)
self.logger.error(f"Skipping stream {self.name}, error message: {message}")
return
raise e

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
Expand Down Expand Up @@ -261,7 +262,7 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
yield record

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
# if reached recursive limit, don't read any more
# if reached recursive limit, don't read anymore
if len(self.block_id_stack) > MAX_BLOCK_DEPTH:
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def transform_properties(record: Mapping[str, Any], dict_key: str = "properties") -> Mapping[str, Any]:
"""
Transfrom nested `properties` object.
Transform nested `properties` object.
Move unique named entities into `name`, `value` to handle normalization.
EXAMPLE INPUT:
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

from airbyte_cdk.models import SyncMode
from pytest import fixture
Expand Down Expand Up @@ -189,3 +189,17 @@ def test_recursive_read(blocks, requests_mock):
inputs = {"sync_mode": SyncMode.incremental}
stream.block_id_stack = [root]
assert list(stream.read_records(**inputs)) == [record3, record2, record1, record4]


def test_invalid_start_cursor(parent, requests_mock, caplog):
stream = parent
error_message = "The start_cursor provided is invalid: wrong_start_cursor"
search_endpoint = requests_mock.post("https://api.notion.com/v1/search", status_code=400,
json={"object": "error", "status": 400, "code": "validation_error",
"message": error_message})

inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}}
with patch.object(stream, "backoff_time", return_value=0.1):
list(stream.read_records(**inputs))
assert search_endpoint.call_count == 6
assert f"Skipping stream pages, error message: {error_message}" in caplog.messages
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def test_users_request_params(patch_base_class):
assert stream.request_params(**inputs) == expected_params


def test_user_stream_handles_pagination_correclty(requests_mock):
def test_user_stream_handles_pagination_correctly(requests_mock):
"""
Test shows that Users stream uses pagination as per Notion API docs.
"""
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/notion.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The connector is restricted by Notion [request limits](https://developers.notion

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------|
| 1.0.9 | 2023-06-08 | [27062](https://github.com/airbytehq/airbyte/pull/27062) | Skip streams with `invalid_start_cursor` error |
| 1.0.8 | 2023-06-07 | [27073](https://github.com/airbytehq/airbyte/pull/27073) | Add empty results handling for stream `Blocks` |
| 1.0.7 | 2023-06-06 | [27060](https://github.com/airbytehq/airbyte/pull/27060) | Add skipping 404 error in `Blocks` stream |
| 1.0.6 | 2023-05-18 | [26286](https://github.com/airbytehq/airbyte/pull/26286) | Add `parent` field to `Blocks` stream |
Expand Down

0 comments on commit f78e2e5

Please sign in to comment.