Skip to content

Commit

Permalink
🚨🚨✨ Source Instagram: Add primary keys for UserLifetimeInsights and U…
Browse files Browse the repository at this point in the history
…serInsights; add airbyte_type to timestamp fields (#32500)
  • Loading branch information
tolik0 authored Nov 17, 2023
1 parent a745a37 commit cc7f019
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 27 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 6acf6b55-4f1e-4fca-944e-1a3caef8aba8
dockerImageTag: 1.0.16
dockerImageTag: 2.0.0
dockerRepository: airbyte/source-instagram
githubIssueLabel: source-instagram
icon: instagram.svg
Expand All @@ -19,6 +19,13 @@ data:
oss:
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
2.0.0:
message:
This release introduces a default primary key for the streams UserLifetimeInsights and UserInsights.
Additionally, the format of timestamp fields has been updated in the UserLifetimeInsights, UserInsights, Media and Stories streams to include timezone information.
upgradeDeadline: "2023-12-03"
suggestedStreams:
streams:
- media
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
},
"timestamp": {
"type": ["null", "string"],
"format": "date-time"
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"username": {
"type": ["null", "string"]
Expand Down Expand Up @@ -94,7 +95,8 @@
},
"timestamp": {
"type": ["null", "string"],
"format": "date-time"
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"username": {
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
},
"timestamp": {
"type": ["null", "string"],
"format": "date-time"
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"username": {
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
},
"date": {
"type": ["null", "string"],
"format": "date-time"
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"follower_count": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
},
"date": {
"type": ["null", "string"],
"format": "date-time"
"format": "date-time",
"airbyte_type": "timestamp_with_timezone"
},
"metric": {
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from cached_property import cached_property
from facebook_business.adobjects.igmedia import IGMedia
from facebook_business.exceptions import FacebookRequestError
Expand All @@ -19,6 +20,24 @@
from .common import remove_params_from_url


class DatetimeTransformerMixin:
transformer: TypeTransformer = TypeTransformer(TransformConfig.CustomSchemaNormalization)

@staticmethod
@transformer.registerCustomTransform
def custom_transform_datetime_rfc3339(original_value, field_schema):
"""
Transform datetime string to RFC 3339 format
"""
if original_value and field_schema.get("format") == "date-time" and field_schema.get("airbyte_type") == "timestamp_with_timezone":
# Parse the ISO format timestamp
dt = pendulum.parse(original_value)

# Convert to RFC 3339 format
return dt.to_rfc3339_string()
return original_value


class InstagramStream(Stream, ABC):
"""Base stream class"""

Expand Down Expand Up @@ -121,10 +140,10 @@ def read_records(
yield self.transform(record)


class UserLifetimeInsights(InstagramStream):
class UserLifetimeInsights(DatetimeTransformerMixin, InstagramStream):
"""Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-user/insights"""

primary_key = None
primary_key = ["business_account_id", "metric", "date"]
LIFETIME_METRICS = ["audience_city", "audience_country", "audience_gender_age", "audience_locale"]
period = "lifetime"

Expand Down Expand Up @@ -156,7 +175,7 @@ def request_params(
return params


class UserInsights(InstagramIncrementalStream):
class UserInsights(DatetimeTransformerMixin, InstagramIncrementalStream):
"""Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-user/insights"""

METRICS_BY_PERIOD = {
Expand All @@ -176,7 +195,7 @@ class UserInsights(InstagramIncrementalStream):
"lifetime": ["online_followers"],
}

primary_key = None
primary_key = ["business_account_id", "date"]
cursor_field = "date"

# For some metrics we can only get insights not older than 30 days, it is Facebook policy
Expand Down Expand Up @@ -295,7 +314,7 @@ def _state_has_legacy_format(self, state: Mapping[str, Any]) -> bool:
return False


class Media(InstagramStream):
class Media(DatetimeTransformerMixin, InstagramStream):
"""Children objects can only be of the media_type == "CAROUSEL_ALBUM".
And children object does not support INVALID_CHILDREN_FIELDS fields,
so they are excluded when trying to get child objects to avoid the error
Expand Down Expand Up @@ -403,7 +422,7 @@ def _get_insights(self, item, account_id) -> Optional[MutableMapping[str, Any]]:
raise error


class Stories(InstagramStream):
class Stories(DatetimeTransformerMixin, InstagramStream):
"""Docs: https://developers.facebook.com/docs/instagram-api/reference/ig-user/stories"""

def read_records(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def config_fixture():
def some_config_fixture(account_id):
return {"start_date": "2021-01-23T00:00:00Z", "access_token": "unknown_token"}


@fixture(scope="session", name="some_config_future_date")
def some_config_future_date_fixture(account_id):
return {"start_date": "2030-01-23T00:00:00Z", "access_token": "unknown_token"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ def test_check_connection_empty_config(api):
assert not ok
assert error_msg


def test_check_connection_invalid_config_future_date(api, some_config_future_date):
ok, error_msg = SourceInstagram().check_connection(logger, config=some_config_future_date)

assert not ok
assert error_msg


def test_check_connection_no_date_config(api, some_config):
some_config.pop("start_date")
ok, error_msg = SourceInstagram().check_connection(logger, config=some_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airbyte_cdk.models import SyncMode
from facebook_business import FacebookAdsApi, FacebookSession
from source_instagram.streams import (
DatetimeTransformerMixin,
InstagramStream,
Media,
MediaInsights,
Expand All @@ -32,15 +33,11 @@ def test_clear_url(config):


def test_state_outdated(api, config):
assert UserInsights(api=api, start_date=config["start_date"])._state_has_legacy_format(
{"state": MagicMock()}
)
assert UserInsights(api=api, start_date=config["start_date"])._state_has_legacy_format({"state": MagicMock()})


def test_state_is_not_outdated(api, config):
assert not UserInsights(api=api, start_date=config["start_date"])._state_has_legacy_format(
{"state": {}}
)
assert not UserInsights(api=api, start_date=config["start_date"])._state_has_legacy_format({"state": {}})


def test_media_get_children(api, requests_mock, some_config):
Expand Down Expand Up @@ -208,9 +205,9 @@ def test_user_lifetime_insights_read(api, config, user_insight_data, requests_mo
@pytest.mark.parametrize(
"values,expected",
[
({"end_time": "test_end_time", "value": "test_value"}, {"date": "test_end_time", "value": "test_value"}),
({"end_time": "2020-05-04T07:00:00+0000", "value": "test_value"}, {"date": "2020-05-04T07:00:00+0000", "value": "test_value"}),
({"value": "test_value"}, {"date": None, "value": "test_value"}),
({"end_time": "test_end_time"}, {"date": "test_end_time", "value": None}),
({"end_time": "2020-05-04T07:00:00+0000"}, {"date": "2020-05-04T07:00:00+0000", "value": None}),
({}, {"date": None, "value": None}),
],
ids=[
Expand Down Expand Up @@ -363,3 +360,22 @@ def test_exit_gracefully(api, config, requests_mock, caplog):
assert not records
assert requests_mock.call_count == 6 # 4 * 1 per `metric_to_period` map + 1 `summary` request + 1 `business_account_id` request
assert "Stopping syncing stream 'user_insights'" in caplog.text


@pytest.mark.parametrize(
"original_value, field_schema, expected",
[
("2020-01-01T12:00:00Z", {"format": "date-time", "airbyte_type": "timestamp_with_timezone"}, "2020-01-01T12:00:00+00:00"),
("2020-05-04T07:00:00+0000", {"format": "date-time", "airbyte_type": "timestamp_with_timezone"}, "2020-05-04T07:00:00+00:00"),
(None, {"format": "date-time", "airbyte_type": "timestamp_with_timezone"}, None),
("2020-01-01T12:00:00", {"format": "date-time", "airbyte_type": "timestamp_without_timezone"}, "2020-01-01T12:00:00"),
("2020-01-01T14:00:00", {"format": "date-time"}, "2020-01-01T14:00:00"),
("2020-02-03T12:00:00", {"type": "string"}, "2020-02-03T12:00:00"),
],
)
def test_custom_transform_datetime_rfc3339(original_value, field_schema, expected):
# Call the static method
result = DatetimeTransformerMixin.custom_transform_datetime_rfc3339(original_value, field_schema)

# Assert the result matches the expected output
assert result == expected
9 changes: 9 additions & 0 deletions docs/integrations/sources/instagram-migrations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Instagram Migration Guide

## Upgrading to 2.0.0

This release adds a default primary key for the streams UserLifetimeInsights and UserInsights, and updates the format of timestamp fields in the UserLifetimeInsights, UserInsights, Media and Stories streams to include timezone information.

To ensure uninterrupted syncs, users should:
- Refresh the source schema
- Reset affected streams
1 change: 1 addition & 0 deletions docs/integrations/sources/instagram.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ AirbyteRecords are required to conform to the [Airbyte type](https://docs.airbyt

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------|
| 2.0.0 | 2023-11-17 | [32500](https://github.com/airbytehq/airbyte/pull/32500) | Add primary keys for UserLifetimeInsights and UserInsights; add airbyte_type to timestamp fields |
| 1.0.16 | 2023-11-17 | [32627](https://github.com/airbytehq/airbyte/pull/32627) | Fix start_date type; fix docs |
| 1.0.15 | 2023-11-14 | [32494](https://github.com/airbytehq/airbyte/pull/32494) | Marked start_date as optional; set max retry time to 10 minutes; add suggested streams |
| 1.0.14 | 2023-11-13 | [32423](https://github.com/airbytehq/airbyte/pull/32423) | Capture media_product_type column in media and stories stream |
Expand Down

0 comments on commit cc7f019

Please sign in to comment.