Skip to content

Commit

Permalink
#15823 source marketo: one more test fix
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Aug 22, 2022
1 parent 6dace17 commit 7142e64
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ tests:
path: "integration_tests/expected_records.txt"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/incremental_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
timeout_seconds: 3600
full_refresh:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"streams": [
{
"stream": {
"name": "activities_visit_webpage",
"json_schema": {},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ def normalize_datetime(self, dt: str, format="%Y-%m-%dT%H:%M:%SZ%z"):
class IncrementalMarketoStream(MarketoStream):
cursor_field = "createdAt"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._state = {}

def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None) -> Iterable:
"""
Endpoint does not provide query filtering params, but they provide us
Expand All @@ -94,12 +98,21 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
for record in json_response:
yield from self.filter_by_state(stream_state=stream_state, record=record)

@property
def state(self):
return self._state

@state.setter
def state(self, value):
self._state = value

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
return {
self._state = {
self.cursor_field: max(
latest_record.get(self.cursor_field, self.start_date), current_stream_state.get(self.cursor_field, self.start_date)
)
}
return self._state

def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
"""
Expand All @@ -116,16 +129,16 @@ def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwa
"""

start_date = pendulum.parse(self.start_date)
end_date = pendulum.parse(self.end_date) if self.end_date else pendulum.now()

# Determine stream_state, if no stream_state we use start_date
if stream_state:
start_date = pendulum.parse(stream_state.get(self.cursor_field))

# use the lowest date between start_date and self.end_date, otherwise API fails if start_date is in future
start_date = min(start_date, end_date)
start_date = min(start_date, pendulum.now())
date_slices = []

end_date = pendulum.parse(self.end_date) if self.end_date else pendulum.now()
while start_date <= end_date:
# the amount of days for each data-chunk begining from start_date
end_date_slice = start_date.add(days=self.window_in_days)
Expand Down

0 comments on commit 7142e64

Please sign in to comment.