Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Asana: Fix StoriesCompact stream, Add Stories stream #31084

Merged
merged 18 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-asana/BOOTSTRAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ Some streams depend on:

- workspaces (Teams, Users, CustomFields, Projects, Tags, Users streams);
- projects (Events, SectionsCompact, Sections, Tasks streams);
- tasks (Events, Stories stream);
- tasks (Events, StoriesCompact stream);
- storiescompact (Stories stream)
- teams (TeamMemberships stream).

Each record can be uniquely identified by a `gid` key.
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-asana/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-asana
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ acceptance_tests:
empty_streams:
- name: custom_fields
bypass_reason: "This stream is not available on the account we're currently using. Please follow https://github.com/airbytehq/airbyte/issues/19662."
- name: events
bypass_reason: "This stream is not available on our current account."
full_refresh:
# tests:
# - config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "stories_compact",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "stories",
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-asana/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: d0243522-dccf-4978-8ba0-37ed47a0bdbf
dockerImageTag: 0.3.0
dockerImageTag: 0.4.0
dockerRepository: airbyte/source-asana
documentationUrl: https://docs.airbyte.com/integrations/sources/asana
githubIssueLabel: source-asana
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,43 @@
{
"type": ["null", "object"],
"properties": {
"gid": {
"type": ["null", "string"]
},
"resource_type": {
"type": ["null", "string"]
},
"created_at": {
"type": ["null", "string"],
"format": "date-time"
},
"gid": { "type": ["null", "string"] },
"resource_type": { "type": ["null", "string"] },
"created_at": { "type": ["null", "string"], "format": "date-time" },
"created_by": {
"type": ["null", "object"],
"properties": {
"gid": {
"type": ["null", "string"]
},
"resource_type": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
}
"gid": { "type": ["null", "string"] },
"resource_type": { "type": ["null", "string"] },
"name": { "type": ["null", "string"] }
}
},
"resource_subtype": {
"type": ["null", "string"]
},
"text": {
"type": ["null", "string"]
"resource_subtype": { "type": ["null", "string"] },
"text": { "type": ["null", "string"] },
"type": { "type": ["null", "string"] },
"html_text": { "type": ["null", "string"] },
"is_pinned": { "type": ["null", "boolean"] },
"sticker_name": { "type": ["null", "string"] },
"is_editable": { "type": ["null", "boolean"] },
"is_edited": { "type": ["null", "boolean"] },
"liked": { "type": ["null", "boolean"] },
"likes": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"gid": { "type": ["null", "string"] },
"user": {
"type": ["null", "object"],
"properties": {
"gid": { "type": ["null", "string"] },
"resource_type": { "type": ["null", "string"] },
"name": { "type": ["null", "string"] }
}
}
}
}
},
"type": {
"type": ["null", "string"]
}
"num_likes": { "type": ["null", "integer"] }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"type": ["null", "object"],
"properties": {
"gid": {
"type": ["null", "string"]
},
"resource_type": {
"type": ["null", "string"]
},
"created_at": {
"type": ["null", "string"],
"format": "date-time"
},
"created_by": {
"type": ["null", "object"],
"properties": {
"gid": {
"type": ["null", "string"]
},
"resource_type": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
}
}
},
"resource_subtype": {
"type": ["null", "string"]
},
"text": {
"type": ["null", "string"]
},
"type": {
"type": ["null", "string"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Sections,
SectionsCompact,
Stories,
StoriesCompact,
Tags,
Tasks,
TeamMemberships,
Expand Down Expand Up @@ -57,14 +58,15 @@ def _get_authenticator(config: dict) -> Union[TokenAuthenticator, AsanaOauth2Aut
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
args = {"authenticator": self._get_authenticator(config)}
args = {"authenticator": self._get_authenticator(config), "test_mode": config["test_mode"]}
streams = [
AttachmentsCompact(**args),
Attachments(**args),
CustomFields(**args),
Projects(**args),
SectionsCompact(**args),
Sections(**args),
StoriesCompact(**args),
Stories(**args),
Tags(**args),
Tasks(**args),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
}
]
},
"test_mode": {
"type": "boolean",
"title": "Test Mode",
"description": "This flag is used for testing purposes for certain streams that return a lot of data. This flag is not meant to be enabled for prod.",
"airbyte_hidden": true
},
"organization_export_ids": {
"title": "Organization Export IDs",
"description": "Globally unique identifiers for the organization exports",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@


from abc import ABC
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Type
from itertools import islice
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Type, Union

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator
from requests.auth import AuthBase

ASANA_ERRORS_MAPPING = {
402: "This stream is available to premium organizations and workspaces only",
Expand All @@ -24,11 +27,16 @@ class AsanaStream(HttpStream, ABC):
# Asana pagination could be from 1 to 100.
page_size = 100
raise_on_http_errors = True
test_mode = False

@property
def AsanaStreamType(self) -> Type:
return self.__class__

def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, test_mode: bool = False):
super().__init__(authenticator=authenticator)
self.test_mode = test_mode

def should_retry(self, response: requests.Response) -> bool:
if response.status_code in ASANA_ERRORS_MAPPING.keys():
self.logger.error(
Expand Down Expand Up @@ -101,6 +109,7 @@ def read_slices_from_records(self, stream_class: AsanaStreamType, slice_field: s
"""
stream = stream_class(authenticator=self.authenticator)
stream_slices = stream.stream_slices(sync_mode=SyncMode.full_refresh)

for stream_slice in stream_slices:
for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
yield {slice_field: record["gid"]}
Expand Down Expand Up @@ -186,9 +195,48 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:


class Events(AsanaStream):
primary_key = "created_at"
sync_token = None

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "events"

def read_records(self, *args, **kwargs):
# Check if sync token is available
if self.sync_token is not None:
# Pass the sync token as a request parameter
kwargs["next_page_token"] = {"sync": self.sync_token}

yield from super().read_records(*args, **kwargs)

# After reading records, update the sync token
self.sync_token = self.get_latest_sync_token()

def get_latest_sync_token(self) -> str:
latest_sync_token = self.state.get("last_sync_token") # Get the previous sync token

if latest_sync_token is None:
return None

return latest_sync_token["sync"] # Extract the sync token value

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if response.status_code == 412: # Check if response is a 412 error
response_json = response.json()
if "sync" in response_json: # Check if new sync token is available
self.sync_token = response_json["sync"]
else:
self.sync_token = None
self.logger.warning("Sync token expired. Fetch the full dataset for this query now.")
else:
response_json = response.json()

# Check if response has new sync token
if "sync" in response_json:
self.sync_token = response_json["sync"]

yield from response_json.get("data", [])

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
decoded_response = response.json()
last_sync = decoded_response.get("sync")
Expand Down Expand Up @@ -226,6 +274,8 @@ def path(self, **kwargs) -> str:


class SectionsCompact(ProjectRelatedStream):
use_cache = True

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
project_gid = stream_slice["project_gid"]
return f"projects/{project_gid}/sections"
Expand All @@ -248,13 +298,40 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield from section_data


class Stories(AsanaStream):
class StoriesCompact(AsanaStream):
use_cache = True

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
task_gid = stream_slice["task_gid"]
return f"tasks/{task_gid}/stories"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
yield from self.read_slices_from_records(stream_class=Tasks, slice_field="task_gid")
# This streams causes tests to timeout (> 2hrs), so we limit stream slices to 100 to make tests less noisy
if self.test_mode:
yield from islice(self.read_slices_from_records(stream_class=Tasks, slice_field="task_gid"), 100)
else:
yield from self.read_slices_from_records(stream_class=Tasks, slice_field="task_gid")


class Stories(AsanaStream):
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
story_gid = stream_slice["story_gid"]
return f"stories/{story_gid}"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
# This streams causes tests to timeout (> 2hrs), so we limit stream slices to 100 to make tests less noisy
if self.test_mode:
yield from islice(self.read_slices_from_records(stream_class=StoriesCompact, slice_field="story_gid"), 100)
else:
yield from self.read_slices_from_records(stream_class=StoriesCompact, slice_field="story_gid")

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
section_data = response_json.get("data", {})
if isinstance(section_data, dict): # Check if section_data is a dictionary
yield section_data
elif isinstance(section_data, list): # Check if section_data is a list
yield from section_data


class Tags(WorkspaceRequestParamsRelatedStream):
Expand Down
29 changes: 15 additions & 14 deletions docs/integrations/sources/asana.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,18 @@ The connector is restricted by normal Asana [requests limitation](https://develo

## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------- |
| 0.3.0 | 2023-10-24 | [31634](https://github.com/airbytehq/airbyte/pull/31634) | Add OrganizationExports stream |
| 0.2.0 | 2023-10-17 | [31090](https://github.com/airbytehq/airbyte/pull/31090) | Add Attachments stream |
| 0.1.9 | 2023-10-16 | [31089](https://github.com/airbytehq/airbyte/pull/31089) | Add Events stream |
| 0.1.8 | 2023-10-16 | [31009](https://github.com/airbytehq/airbyte/pull/31009) | Add SectionsCompact stream |
| 0.1.7 | 2023-05-29 | [26716](https://github.com/airbytehq/airbyte/pull/26716) | Remove authSpecification from spec.json, use advancedAuth instead |
| 0.1.6 | 2023-05-26 | [26653](https://github.com/airbytehq/airbyte/pull/26653) | Fix order of authentication methods |
| 0.1.5 | 2022-11-16 | [19561](https://github.com/airbytehq/airbyte/pull/19561) | Added errors handling, updated SAT with new format |
| 0.1.4 | 2022-08-18 | [15749](https://github.com/airbytehq/airbyte/pull/15749) | Add cache to project stream |
| 0.1.3 | 2021-10-06 | [6832](https://github.com/airbytehq/airbyte/pull/6832) | Add oauth init flow parameters support |
| 0.1.2 | 2021-09-24 | [6402](https://github.com/airbytehq/airbyte/pull/6402) | Fix SAT tests: update schemas and invalid_config.json file |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add entrypoint and bump version for connector |
| 0.1.0 | 2021-05-25 | [3510](https://github.com/airbytehq/airbyte/pull/3510) | New Source: Asana |
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------- |
| 0.4.0 | 2023-10-24 | [31084](https://github.com/airbytehq/airbyte/pull/31084) | Add StoriesCompact stream |
| 0.3.0 | 2023-10-24 | [31634](https://github.com/airbytehq/airbyte/pull/31634) | Add OrganizationExports stream |
| 0.2.0 | 2023-10-17 | [31090](https://github.com/airbytehq/airbyte/pull/31090) | Add Attachments stream |
| 0.1.9 | 2023-10-16 | [31089](https://github.com/airbytehq/airbyte/pull/31089) | Add Events stream |
| 0.1.8 | 2023-10-16 | [31009](https://github.com/airbytehq/airbyte/pull/31009) | Add SectionsCompact stream |
| 0.1.7 | 2023-05-29 | [26716](https://github.com/airbytehq/airbyte/pull/26716) | Remove authSpecification from spec.json, use advancedAuth instead |
| 0.1.6 | 2023-05-26 | [26653](https://github.com/airbytehq/airbyte/pull/26653) | Fix order of authentication methods |
| 0.1.5 | 2022-11-16 | [19561](https://github.com/airbytehq/airbyte/pull/19561) | Added errors handling, updated SAT with new format |
| 0.1.4 | 2022-08-18 | [15749](https://github.com/airbytehq/airbyte/pull/15749) | Add cache to project stream |
| 0.1.3 | 2021-10-06 | [6832](https://github.com/airbytehq/airbyte/pull/6832) | Add oauth init flow parameters support |
| 0.1.2 | 2021-09-24 | [6402](https://github.com/airbytehq/airbyte/pull/6402) | Fix SAT tests: update schemas and invalid_config.json file |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add entrypoint and bump version for connector |
| 0.1.0 | 2021-05-25 | [3510](https://github.com/airbytehq/airbyte/pull/3510) | New Source: Asana |
Loading