Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Instagram: Read previous state format and upgrade it #4805

Merged
merged 6 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-instagram
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
#

from datetime import datetime
from typing import Any, List, Mapping, Tuple, Type
from typing import Any, List, Mapping, Tuple, MutableMapping, Iterator

from airbyte_cdk.models import ConnectorSpecification, DestinationSyncMode
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification, DestinationSyncMode, ConfiguredAirbyteCatalog, AirbyteMessage
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -70,7 +71,16 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any

return ok, error_msg

def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
for stream in self.streams(config):
state_key = str(stream.name)
if state_key in state and hasattr(stream, 'upgrade_state_to_latest_format'):
state[state_key] = stream.upgrade_state_to_latest_format(state[state_key])
return super().read(logger, config, catalog, state)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""Discovery method, returns available streams

:param config: A Mapping of the user input configuration as defined in the connector spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# SOFTWARE.
#

import copy
from abc import ABC
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
Expand Down Expand Up @@ -54,6 +55,10 @@ def fields(self) -> List[str]:
fields = list(self.get_json_schema().get("properties", {}).keys())
return list(set(fields) - set(non_object_fields))

def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Upgrade state to latest format and return new state object"""
return copy.deepcopy(state)

def request_params(
self,
stream_slice: Mapping[str, Any] = None,
Expand Down Expand Up @@ -238,7 +243,7 @@ def stream_slices(
start_date = pendulum.parse(state_value) if state_value else self._start_date
start_date = max(start_date, self._start_date, pendulum.now().subtract(days=self.buffer_days))
for since in pendulum.period(start_date, self._end_date).range("days", self.days_increment):
until = min(since.add(days=self.days_increment), self._end_date)
until = since.add(days=self.days_increment)
self.logger.info(f"Reading insights between {since.date()} and {until.date()}")
yield {
**stream_slice,
Expand All @@ -259,17 +264,33 @@ def request_params(
"until": stream_slice["until"],
}

def _state_has_legacy_format(self, state: Mapping[str, Any]) -> bool:
"""Tell if the format of state is outdated"""
for value in state.values():
if not isinstance(value, Mapping):
return True
return False

def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Upgrade state to latest format and return new state object"""
if self._state_has_legacy_format(state):
return {account_id: {self.cursor_field: str(cursor_value)} for account_id, cursor_value in state.items()}

return super().upgrade_state_to_latest_format(state)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""Update stream state from latest record"""
record_value = latest_record[self.cursor_field]
state_value = current_stream_state.get("business_account_id", {}).get(self.cursor_field) or record_value
account_id = latest_record.get("business_account_id")
state_value = current_stream_state.get(account_id, {}).get(self.cursor_field) or record_value
max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value))

current_stream_state[latest_record["business_account_id"]] = {
new_stream_state = copy.deepcopy(current_stream_state)
new_stream_state[account_id] = {
self.cursor_field: str(max_cursor),
}

return current_stream_state
return new_stream_state


class Media(InstagramStream):
Expand Down Expand Up @@ -356,8 +377,10 @@ def _get_insights(self, item, account_id) -> Optional[MutableMapping[str, Any]]:
# An error might occur if the media was posted before the most recent time that
# the user's account was converted to a business account from a personal account
if error.api_error_subcode() == 2108006:
self.logger.error(f"Insights error for business_account_id {account_id}: {error.api_error_message()}")

details = error.body().get("error", {}).get("error_user_title") or error.api_error_message()
Copy link
Contributor Author

@keu keu Jul 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also improves error message
instead

Insights error for business_account_id 17841404107945569: Invalid parameter

will be

Insights error for business_account_id 17841404107945569: Media Posted Before Business Account Conversion.

self.logger.error(
f"Insights error for business_account_id {account_id}: {details}"
)
# We receive all Media starting from the last one, and if on the next Media we get an Insight error,
# then no reason to make inquiries for each Media further, since they were published even earlier.
return None
Expand Down