Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Asana: migrate to new SAT, added base HTTP errors handling #19561

Merged
merged 18 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
- name: Asana
sourceDefinitionId: d0243522-dccf-4978-8ba0-37ed47a0bdbf
dockerRepository: airbyte/source-asana
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.com/integrations/sources/asana
icon: asana.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-asana:0.1.4"
- dockerImage: "airbyte/source-asana:0.1.5"
spec:
documentationUrl: "https://docsurl.com"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-asana/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-asana
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-asana/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
docker build . --no-cache -t airbyte/source-asana:dev \
&& python -m pytest -p source_acceptance_test.plugin
```
To run your integration tests with docker

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-asana:dev
tests:
test_strictness_level: high
acceptance_tests:
spec:
- spec_path: "source_asana/spec.json"
tests:
- spec_path: "source_asana/spec.json"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 7200
tests:
- config_path: "secrets/config.json"
timeout_seconds: 7200
expect_records:
bypass_reason: "Bypassed until dedicated sandbox account is up and running. Please follow https://github.com/airbytehq/airbyte/issues/19662."
empty_streams:
- name: custom_fields
bypass_reason: "This stream is not available on the account we're currently using. Please follow https://github.com/airbytehq/airbyte/issues/19662."
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 7200
# tests:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# timeout_seconds: 7200
bypass_reason: "As we are using an internal account the data is not frozen and results of `two-sequential-reads` are flaky. Please follow https://github.com/airbytehq/airbyte/issues/19662."
incremental:
bypass_reason: "Incremental syncs are not supported on this connector."
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,41 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations

from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Type
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Type

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream

ASANA_ERRORS_MAPPING = {
402: "This stream is available to premium organizations and workspaces only",
403: "Missing permissions to consume this stream enough permissions",
404: "The object specified by the request does not exist",
451: "This request was blocked for legal reasons",
}


class AsanaStream(HttpStream, ABC):
url_base = "https://app.asana.com/api/1.0/"

primary_key = "gid"

# Asana pagination could be from 1 to 100.
page_size = 100
raise_on_http_errors = True

@property
def AsanaStreamType(self) -> Type:
return self.__class__

def should_retry(self, response: requests.Response) -> bool:
if response.status_code in ASANA_ERRORS_MAPPING.keys():
self.logger.error(
f"Skipping stream {self.name}. {ASANA_ERRORS_MAPPING.get(response.status_code)}. Full error message: {response.text}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @erohmensing this connector is implementing a custom approach at skipping unavailable stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add it to the list 👀

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YowanR:

because of how hard it was to get OAuth for this account...

Just to clarify, is enabling oAuth per-account in Asana? Do you pay for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is per account. I don't believe we had to pay extra for it, but the process itself was incredibly slow (This is main reason we had this connector move out from Q3). @igrankova and @ycherniaiev can give more context here.

)
setattr(self, "raise_on_http_errors", False)
return False
return super().should_retry(response)

def backoff_time(self, response: requests.Response) -> Optional[int]:
delay_time = response.headers.get("Retry-After")
Expand All @@ -31,17 +49,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
if next_page:
return {"offset": next_page["offset"]}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = {"limit": self.page_size}

params.update(self.get_opt_fields())

if next_page_token:
params.update(next_page_token)

return params

def get_opt_fields(self) -> MutableMapping[str, str]:
Expand Down Expand Up @@ -81,7 +93,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
response_json = response.json()
yield from response_json.get("data", []) # Asana puts records in a container array "data"

def read_slices_from_records(self, stream_class: Type[AsanaStream], slice_field: str) -> Iterable[Optional[Mapping[str, Any]]]:
def read_slices_from_records(self, stream_class: AsanaStreamType, slice_field: str) -> Iterable[Optional[Mapping[str, Any]]]:
"""
General function for getting parent stream (which should be passed through `stream_class`) slice.
Generates dicts with `gid` of parent streams.
Expand All @@ -100,9 +112,7 @@ class WorkspaceRelatedStream(AsanaStream, ABC):
into the path or will pass it as a request parameter.
"""

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
workspaces_stream = Workspaces(authenticator=self.authenticator)
for workspace in workspaces_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"workspace_gid": workspace["gid"]}
Expand All @@ -114,10 +124,8 @@ class WorkspaceRequestParamsRelatedStream(WorkspaceRelatedStream, ABC):
So this is basically the whole point of this class - to pass `workspace` as request argument.
"""

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(**kwargs)
params["workspace"] = stream_slice["workspace_gid"]
return params

Expand All @@ -128,9 +136,7 @@ class ProjectRelatedStream(AsanaStream, ABC):
argument in request.
"""

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self.read_slices_from_records(stream_class=Projects, slice_field="project_gid")


Expand Down Expand Up @@ -158,9 +164,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
task_gid = stream_slice["task_gid"]
return f"tasks/{task_gid}/stories"

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self.read_slices_from_records(stream_class=Tasks, slice_field="task_gid")


Expand All @@ -173,10 +177,8 @@ class Tasks(ProjectRelatedStream):
def path(self, **kwargs) -> str:
return "tasks"

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
def request_params(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_slice=stream_slice, **kwargs)
params["project"] = stream_slice["project_gid"]
return params

Expand All @@ -202,9 +204,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
team_gid = stream_slice["team_gid"]
return f"teams/{team_gid}/team_memberships"

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self.read_slices_from_records(stream_class=Teams, slice_field="team_gid")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,24 @@ def test_next_page_token():
inputs = {"response": MagicMock()}
expected = "offset"
assert expected in stream.next_page_token(**inputs)


@pytest.mark.parametrize(
("http_status_code", "should_retry"),
[
(402, False),
(403, False),
(404, False),
(451, False),
(429, True),
],
)
def test_should_retry(http_status_code, should_retry):
"""
402, 403, 404, 451 - should not retry.
429 - should retry.
"""
response_mock = MagicMock()
response_mock.status_code = http_status_code
stream = Stories(MagicMock())
assert stream.should_retry(response_mock) == should_retry
1 change: 1 addition & 0 deletions docs/integrations/sources/asana.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The connector is restricted by normal Asana [requests limitation](https://develo

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------- |
| 0.1.5 | 2022-11-16 | [19561](https://github.com/airbytehq/airbyte/pull/19561) | Added errors handling, updated SAT with new format
| 0.1.4 | 2022-08-18 | [15749](https://github.com/airbytehq/airbyte/pull/15749) | Add cache to project stream |
| 0.1.3 | 2021-10-06 | [6832](https://github.com/airbytehq/airbyte/pull/6832) | Add oauth init flow parameters support |
| 0.1.2 | 2021-09-24 | [6402](https://github.com/airbytehq/airbyte/pull/6402) | Fix SAT tests: update schemas and invalid\_config.json file |
Expand Down