-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Source Harvest: Improve HTTP Availability #35541
Changes from all commits
6cc7849
066c69b
70cecc2
620ac6b
6b53541
dd3a7ca
09ab3ec
b34c63e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
import logging | ||
from typing import Dict | ||
|
||
import requests | ||
from airbyte_cdk.sources import Source | ||
from airbyte_cdk.sources.streams import Stream | ||
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy | ||
from requests import HTTPError | ||
|
||
|
||
class HarvestAvailabilityStrategy(HttpAvailabilityStrategy): | ||
""" | ||
This class is tested as part of test_source.check_connection | ||
""" | ||
|
||
def reasons_for_unavailable_status_codes( | ||
self, stream: Stream, logger: logging.Logger, source: Source, error: HTTPError | ||
) -> Dict[int, str]: | ||
reasons_for_codes: Dict[int, str] = { | ||
requests.codes.UNAUTHORIZED: "Please ensure your credentials are valid.", | ||
requests.codes.FORBIDDEN: "This is most likely due to insufficient permissions on the credentials in use.", | ||
requests.codes.NOT_FOUND: "Please ensure that your account ID is properly set. If it is the case and you are still seeing this error, please contact Airbyte support.", | ||
} | ||
return reasons_for_codes |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,16 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
|
||
from typing import Any, List, Mapping, Tuple | ||
import logging | ||
from typing import Any, List, Mapping, Optional, Tuple | ||
|
||
import pendulum | ||
from airbyte_cdk.logger import AirbyteLogger | ||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources import AbstractSource | ||
from airbyte_cdk.sources.streams import Stream | ||
from airbyte_cdk.utils import AirbyteTracedException | ||
from airbyte_protocol.models import FailureType | ||
from source_harvest.availability_strategy import HarvestAvailabilityStrategy | ||
from source_harvest.streams import ( | ||
BillableRates, | ||
Clients, | ||
|
@@ -53,6 +54,11 @@ class SourceHarvest(AbstractSource): | |
def get_authenticator(config): | ||
credentials = config.get("credentials", {}) | ||
if credentials and "client_id" in credentials: | ||
if "account_id" not in config: | ||
raise AirbyteTracedException( | ||
"Config validation error: 'account_id' is a required property", | ||
failure_type=FailureType.config_error, | ||
) | ||
Comment on lines
+57
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. General step-back question: Do we have a generalized exception we raise if the input is not valid against the JSON Schema? (Could that handle this case if it did? I'm not actually sure, re: conditional requirements, but might work if it's a oneOf) Obviously we enforce this on the frontend in the platform but with terraform, pyairbyte etc. we should catch these things more generally if possible There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this make sense. I'm not sure if this was a possible in the whole workflow of the platform, but it is definitely possible from the source's perpective. Not sure if we should have something in the CDK to validate the config in the entrypoint before passing this to the source There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we do! We validate the config against the spec in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if the spec is correct re: what is required and what isn't, a Config Error should be raised by this validation |
||
return HarvestOauth2Authenticator( | ||
token_refresh_endpoint="https://id.getharvest.com/api/v2/oauth2/token", | ||
client_id=credentials.get("client_id"), | ||
|
@@ -63,20 +69,17 @@ def get_authenticator(config): | |
|
||
api_token = credentials.get("api_token", config.get("api_token")) | ||
if not api_token: | ||
raise Exception("Config validation error: 'api_token' is a required property") | ||
raise AirbyteTracedException( | ||
"Config validation error: 'api_token' is a required property", | ||
failure_type=FailureType.config_error, | ||
) | ||
return HarvestTokenAuthenticator(token=api_token, account_id=config["account_id"]) | ||
|
||
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | ||
try: | ||
auth = self.get_authenticator(config) | ||
replication_start_date = pendulum.parse(config["replication_start_date"]) | ||
users_gen = Users(authenticator=auth, replication_start_date=replication_start_date).read_records( | ||
sync_mode=SyncMode.full_refresh | ||
) | ||
next(users_gen) | ||
return True, None | ||
except Exception as error: | ||
return False, f"Unable to connect to Harvest API with the provided credentials - {repr(error)}" | ||
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[str]]: | ||
auth = self.get_authenticator(config) | ||
replication_start_date = pendulum.parse(config["replication_start_date"]) | ||
users_stream = Users(authenticator=auth, replication_start_date=replication_start_date) | ||
return HarvestAvailabilityStrategy().check_availability(users_stream, logger, self) | ||
|
||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
import unittest | ||
from unittest.mock import Mock, patch | ||
|
||
import pytest | ||
import requests | ||
from airbyte_cdk import AirbyteLogger | ||
from airbyte_cdk.models import FailureType | ||
from airbyte_cdk.utils import AirbyteTracedException | ||
from config import ConfigBuilder | ||
from requests import HTTPError | ||
from source_harvest.source import SourceHarvest | ||
|
||
|
||
def _a_response(status_code: int) -> requests.Response: | ||
response = Mock(spec=requests.Response) | ||
response.status_code = status_code | ||
response.url = "any url" | ||
response.reason = "any reason" | ||
return response | ||
|
||
|
||
class SourceTest(unittest.TestCase): | ||
|
||
def setUp(self) -> None: | ||
self._source = SourceHarvest() | ||
self._logger = Mock(spec=AirbyteLogger) | ||
self._config = ConfigBuilder().build() | ||
|
||
def test_given_config_with_client_id_without_account_id_when_check_connection_then_raise_config_error(self) -> None: | ||
config = ConfigBuilder().with_client_id("a client id").build() | ||
config.pop("account_id") | ||
|
||
with pytest.raises(AirbyteTracedException) as exception_trace: | ||
self._source.check_connection(self._logger, config) | ||
assert exception_trace.value.failure_type == FailureType.config_error | ||
|
||
def test_given_config_no_authentication_in_config_when_check_connection_then_raise_config_error(self) -> None: | ||
config = ConfigBuilder().build() | ||
config["credentials"].pop("api_token", None) | ||
config["credentials"].pop("client_id", None) | ||
|
||
with pytest.raises(AirbyteTracedException) as exception_trace: | ||
self._source.check_connection(self._logger, config) | ||
assert exception_trace.value.failure_type == FailureType.config_error | ||
|
||
@patch("source_harvest.source.Users.read_records") | ||
def test_given_400_http_error_when_check_connection_then_raise_non_config_error(self, mocked_user_read_records) -> None: | ||
""" | ||
Following https://github.com/airbytehq/airbyte/pull/35305 where no page alerts were emitted | ||
""" | ||
mocked_user_read_records.side_effect = HTTPError(response=_a_response(400)) | ||
|
||
with pytest.raises(Exception) as exception: | ||
self._source.check_connection(self._logger, self._config) | ||
assert not isinstance(exception, AirbyteTracedException) or exception.failure_type != FailureType.config_error | ||
|
||
@patch("source_harvest.source.Users.read_records") | ||
def test_given_401_http_error_when_check_connection_then_is_not_available(self, mocked_user_read_records) -> None: | ||
mocked_user_read_records.side_effect = HTTPError(response=_a_response(401)) | ||
is_available, _ = self._source.check_connection(self._logger, self._config) | ||
assert not is_available | ||
|
||
@patch("source_harvest.source.Users.read_records") | ||
def test_given_403_http_error_when_check_connection_then_is_not_available(self, mocked_user_read_records) -> None: | ||
mocked_user_read_records.side_effect = HTTPError(response=_a_response(403)) | ||
is_available, _ = self._source.check_connection(self._logger, self._config) | ||
assert not is_available | ||
|
||
@patch("source_harvest.source.Users.read_records") | ||
def test_given_404_http_error_when_check_connection_then_is_not_available(self, mocked_user_read_records) -> None: | ||
mocked_user_read_records.side_effect = HTTPError(response=_a_response(404)) | ||
is_available, _ = self._source.check_connection(self._logger, self._config) | ||
assert not is_available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an error we've encountered before, in that we know if we provide the wrong ID, it will give us a 404?
If not I'd take this one out of the list in case we get 404 because the stream slice was messed up or something like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While messing with the config, yes. Like mentioned in this error message, if the account ID is not set properly, this is the HTTP status code that will be provided. Hence, it felt like it made sense to put this as a config error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok! If I were only thinking about the platform workflow, I would wonder if we should put this exclusively in
check
and not in the availability strategy, because the value of the account ID in the config is not something you can change without going through check again. Compared to an account's permissions to access a certain resource, which could be revoked on the user's end. I guess a whole account could be revoked...But given that for other use cases (API etc) this could be the case that the account ID has changed in between, I'm fine leaving it. This is a wider problem though that made me try to start the conversation here.