Skip to content

Commit

Permalink
Support incremental updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Xabilahu committed Oct 26, 2022
1 parent fc4add9 commit c6dcea5
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ tests:
extra_fields: no
exact_order: no
extra_records: yes
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"todo-stream-name": {
"todo-field-name": "todo-abnormal-value"
"nasa-apod": {
"date": "9999-12-31"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
"name": "nasa_apod",
"json_schema": {},
"supported_sync_modes": [
"full_refresh"
"full_refresh",
"incremental"
],
"source_defined_cursor": true,
"default_cursor_field": [
"date"
],
"source_defined_primary_key": [
[
"date"
]
]
},
"sync_mode": "full_refresh",
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"todo-stream-name": {
"todo-field-name": "value"
"nasa-apod": {
"date": "2022-10-15"
}
}
64 changes: 61 additions & 3 deletions airbyte-integrations/connectors/source-nasa/source_nasa/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream

date_format = "%Y-%m-%d"
Expand Down Expand Up @@ -45,9 +45,47 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield from r


class NasaApod(NasaStream):
class NasaApod(NasaStream, IncrementalMixin):

cursor_field = "date"
primary_key = "date"
start_date_key = "start_date"
end_date_key = "end_date"

def __init__(self, config: Mapping[str, any], **kwargs):
super().__init__(config)
self.start_date = datetime.strptime(config.pop(self.start_date_key), date_format) if self.start_date_key in config else datetime.now()
self.end_date = datetime.strptime(config.pop(self.end_date_key), date_format) if self.end_date_key in config else datetime.now()
self.sync_mode = SyncMode.full_refresh
self._cursor_value = self.start_date

@property
def state(self) -> Mapping[str, Any]:
return {self.cursor_field: self._cursor_value.strftime(date_format)}

@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = datetime.strptime(value[self.cursor_field], date_format)

def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, Any]]:
"""
Returns a list of each day between the start date and end date.
The return value is a list of dicts {'date': date_string}.
"""
dates = []
while start_date <= self.end_date:
dates.append({self.cursor_field: start_date.strftime(date_format)})
start_date += timedelta(days=1)
return dates

def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
if stream_state and self.cursor_field in stream_state and datetime.strptime(stream_state[self.cursor_field], date_format) > self.end_date:
return []
if sync_mode == SyncMode.full_refresh:
return [self.start_date]

start_date = datetime.strptime(stream_state[self.cursor_field], date_format) if stream_state and self.cursor_field in stream_state else self.start_date
return self._chunk_date_range(start_date)

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
Expand All @@ -57,7 +95,27 @@ def path(
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return self.config
request_dict = {
**self.config,
**super().request_params(stream_state, stream_slice, next_page_token)
}
if self.sync_mode == SyncMode.full_refresh:
request_dict[self.start_date_key] = self.start_date.strftime(date_format)
request_dict[self.end_date_key] = self.end_date.strftime(date_format)
else:
request_dict[self.primary_key] = stream_slice[self.cursor_field]
return request_dict

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
self.sync_mode = kwargs.get("sync_mode", SyncMode.full_refresh)
if self._cursor_value and self._cursor_value > self.end_date:
yield []
else:
for record in super().read_records(*args, **kwargs):
if self._cursor_value:
latest_record_date = datetime.strptime(record[self.cursor_field], date_format)
self._cursor_value = max(self._cursor_value, latest_record_date)
yield record


# Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ connectionSpecification:
type: string
description: >-
Indicates the start of a date range. All images in the range from `start_date` to
`end_date` will be returned in a JSON array.
`end_date` will be returned in a JSON array. Must be after 1995-06-16, the first day
an APOD picture was posted. There are no images for tomorrow available through this API.
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
examples:
- "2022-10-20"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from datetime import datetime, timedelta
from airbyte_cdk.models import SyncMode
from pytest import fixture
from source_nasa.source import NasaApod

config = {"api_key": "foobar"}


@fixture
def patch_incremental_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(NasaApod, "path", "v0/example_endpoint")
mocker.patch.object(NasaApod, "primary_key", "test_primary_key")
mocker.patch.object(NasaApod, "__abstractmethods__", set())


def test_cursor_field(patch_incremental_base_class):
stream = NasaApod(config=config)
expected_cursor_field = "date"
assert stream.cursor_field == expected_cursor_field


def test_stream_slices(patch_incremental_base_class):
stream = NasaApod(config=config)
start_date = datetime.now() - timedelta(days=3)
inputs = {"sync_mode": SyncMode.incremental, "cursor_field": ["date"], "stream_state": {"date": start_date.strftime("%Y-%m-%d")}}
expected_stream_slice = [{"date": (start_date + timedelta(days=x)).strftime("%Y-%m-%d")} for x in range(4)]
assert stream.stream_slices(**inputs) == expected_stream_slice


def test_supports_incremental(patch_incremental_base_class, mocker):
mocker.patch.object(NasaApod, "cursor_field", "dummy_field")
stream = NasaApod(config=config)
assert stream.supports_incremental


def test_source_defined_cursor(patch_incremental_base_class):
stream = NasaApod(config=config)
assert stream.source_defined_cursor


def test_stream_checkpoint_interval(patch_incremental_base_class):
stream = NasaApod(config=config)
expected_checkpoint_interval = None
assert stream.state_checkpoint_interval == expected_checkpoint_interval
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from http import HTTPStatus
from unittest.mock import MagicMock

Expand All @@ -21,9 +22,9 @@ def patch_base_class(mocker):


def test_request_params(patch_base_class):
stream = NasaApod(config=config)
stream = NasaApod(config={**config, "start_date": "2022-09-10"})
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
expected_params = {"api_key": api_key_value}
expected_params = {"api_key": api_key_value, "start_date": "2022-09-10", "end_date": datetime.now().strftime("%Y-%m-%d")}
assert stream.request_params(**inputs) == expected_params


Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/sources/nasa.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ If there are more endpoints you'd like Airbyte to support, please [create an iss
| Feature | Supported? |
|:------------------|:-----------|
| Full Refresh Sync | Yes |
| Incremental Sync | No |
| Incremental Sync | Yes |
| SSL connection | No |
| Namespaces | No |

Expand Down

0 comments on commit c6dcea5

Please sign in to comment.