Skip to content

Commit

Permalink
šŸ› Lowcode: ListStreamSlicer and SubstreamSlicer should get the streamā€¦
Browse files Browse the repository at this point in the history
ā€¦_slice from the arguments (#18574)

* fix list stream slicer

* get the stream slice from the parameters

* delete unused code

* Add comments

* missing test

* newline

* with pytest.raise

* reset to master

* newline

* Update __init__.py

* Update __init__.py

* šŸŽ‰ New Source: ConvertKit (#18455)

* Init source omnisend

* Removed unnecessary files

* Init source convertkit

* Added forms, sequences streams

* Added tags stream

* Added subscribers, broadcasts streams

* Added documentation

* Removed unnecessary files

* Updated pull request information in documentation

* Added sample config

* Updated invalid config

* Formatting, removed abnormal state, uncommented full_refresh acceptance test

* Added pagination for subscribers stream

* fix: add source definition for convertkit

* auto-bump connector version

Co-authored-by: Sajarin <sajarindider@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* šŸŽ‰New Source: Google Webfonts [low-code cdk] (#18496)

* New source: Google webfonts

* chore: Add Docs

* chore: update changelog

* chore: resolved given comments for PR

* chore: unwanted files removed

* fix: generate and add source definitions

Co-authored-by: sajarin <sajarindider@gmail.com>

* Clean up build.gradle. (#18555)

Upstream has fixed this bug so we no longer need to host it internally. Remove this to clean up build.gradle.

* šŸŽ‰ New Destination: Typesense (#18349)

* Initial boilerplate

* šŸŽ‰ New Destination: Typesense

* remove .java-version

* fix doc

* add typesense to dest def

* add release stage

* add requirement to main

* auto-bump connector version

* add changelog

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* šŸŽ‰ New Source: Waiteraid [low-code cdk] (#18165)

* added source waiteraid

* šŸŽ‰ New Source: Waiteraid

* add searchBookings stream

* add P/R number

* add SUMMARY entry

* add docs/integrations/README.md entry

* add builds.md entry

* add docs to each endpoint

* fix schema
~

* Update airbyte-integrations/builds.md

added web address

Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>

* Delete catalog.json

* Delete TODO.md

* Update spec.yaml

* add waiteraid to source def seed

* auto-bump connector version

Co-authored-by: Sebastian Brickel <sebastianbrickel@Sebastians-MacBook-Air.local>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* Replace `recipesLink` link with `tutorialsLink` (#18616)

* Protocol Change: `AirbyteControlMessage.ConnectorConfig` (#17907)

* Protocol Change: AirbyteConfigMessage

* update PR link in docs

* Lint

* Update python files

* Update docs/understanding-airbyte/airbyte-protocol.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update docs/understanding-airbyte/airbyte-protocol.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* `AirbyteConfigMessage` -> `AirbyteConnectorConfigMessage`

* AirbyteOrchestratorMessage

* Update docs

* `AirbyteControlConnectorConfigMessage`

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Remove workflow version check (#18613)

* Improved the Oracle cloud deployment guide (#18615)

* Update OCI VM deployment guide

* Update OCI deployment guide

* Update on oci-vm.md

* bump

Co-authored-by: Dainius Salkauskas <dainiuxazz@gmail.com>
Co-authored-by: Sajarin <sajarindider@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Balasubramanian T K <btk.codedev@gmail.com>
Co-authored-by: Davin Chia <davinchia@gmail.com>
Co-authored-by: Cirdes <cirdes@linkana.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: sbrickel-nimble <113671803+sbrickel-nimble@users.noreply.github.com>
Co-authored-by: Sebastian Brickel <sebastianbrickel@Sebastians-MacBook-Air.local>
Co-authored-by: Evan Tahler <evan@airbyte.io>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: Jonathan Pearlin <jonathan@airbyte.io>
Co-authored-by: Victor Ikechukwu <vickyikechukwu13@gmail.com>
  • Loading branch information
15 people authored Oct 28, 2022
1 parent 44d78bc commit cd1f492
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 36 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.4.2
Low-code: Fix off by one error with the stream slicers

## 0.4.1
Low-code: Fix a few bugs with the stream slicers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ def __post_init__(self, options: Mapping[str, Any]):
self._cursor = None

def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
# This method is called after the records are processed.
slice_value = stream_slice.get(self.cursor_field.eval(self.config))
if slice_value and slice_value in self.slice_values:
self._cursor = slice_value
else:
raise ValueError(f"Unexpected stream slice: {slice_value}")

def get_stream_state(self) -> StreamState:
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}
Expand All @@ -56,37 +59,45 @@ def get_request_params(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.request_parameter)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

def get_request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.header)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.header, stream_slice)

def get_request_body_data(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.body_data)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.body_data, stream_slice)

def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.body_json)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.body_json, stream_slice)

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
return [{self.cursor_field.eval(self.config): slice_value} for slice_value in self.slice_values]

def _get_request_option(self, request_option_type: RequestOptionType):
if self.request_option and self.request_option.inject_into == request_option_type and self._cursor:
return {self.request_option.field_name: self._cursor}
def _get_request_option(self, request_option_type: RequestOptionType, stream_slice: StreamSlice):
if self.request_option and self.request_option.inject_into == request_option_type and stream_slice:
slice_value = stream_slice.get(self.cursor_field.eval(self.config))
if slice_value:
return {self.request_option.field_name: slice_value}
else:
return {}
else:
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __post_init__(self, options: Mapping[str, Any]):
self._options = options

def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
# This method is called after the records are processed.
cursor = {}
for parent_stream_config in self.parent_stream_configs:
slice_value = stream_slice.get(parent_stream_config.stream_slice_field)
Expand All @@ -64,39 +65,43 @@ def get_request_params(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.request_parameter)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

def get_request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.header)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.header, stream_slice)

def get_request_body_data(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.body_data)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.body_data, stream_slice)

def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
return self._get_request_option(RequestOptionType.body_json)
# Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
return self._get_request_option(RequestOptionType.body_json, stream_slice)

def _get_request_option(self, option_type: RequestOptionType):
def _get_request_option(self, option_type: RequestOptionType, stream_slice: StreamSlice):
params = {}
if self._cursor:
if stream_slice:
for parent_config in self.parent_stream_configs:
if parent_config.request_option and parent_config.request_option.inject_into == option_type:
key = parent_config.stream_slice_field
value = self._cursor.get(key)
value = stream_slice.get(key)
if value:
params.update({key: value})
return params
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.4.1",
version="0.4.2",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,14 @@ def test_update_cursor(test_name, stream_slice, expected_state):
),
]
slicer = CartesianProductStreamSlicer(stream_slicers=stream_slicers, options={})
slicer.update_cursor(stream_slice, None)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state

if expected_state:
slicer.update_cursor(stream_slice, None)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
else:
with pytest.raises(ValueError):
slicer.update_cursor(stream_slice, None)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -169,12 +174,12 @@ def test_request_option(
],
options={},
)
slicer.update_cursor({"owner_resource": "customer", "repository": "airbyte"}, None)
stream_slice = {"owner_resource": "customer", "repository": "airbyte"}

assert expected_req_params == slicer.get_request_params()
assert expected_headers == slicer.get_request_headers()
assert expected_body_json == slicer.get_request_body_json()
assert expected_body_data == slicer.get_request_body_data()
assert expected_req_params == slicer.get_request_params(stream_slice=stream_slice)
assert expected_headers == slicer.get_request_headers(stream_slice=stream_slice)
assert expected_body_json == slicer.get_request_body_json(stream_slice=stream_slice)
assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice)


def test_request_option_before_updating_cursor():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ def test_list_stream_slicer(test_name, slice_values, cursor_field, expected_slic
)
def test_update_cursor(test_name, stream_slice, last_record, expected_state):
slicer = ListStreamSlicer(slice_values=slice_values, cursor_field=cursor_field, config={}, options={})
slicer.update_cursor(stream_slice, last_record)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
if expected_state:
slicer.update_cursor(stream_slice, last_record)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
else:
with pytest.raises(ValueError):
slicer.update_cursor(stream_slice, last_record)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -111,11 +115,10 @@ def test_request_option(test_name, request_option, expected_req_params, expected
slicer = ListStreamSlicer(slice_values=slice_values, cursor_field=cursor_field, config={}, request_option=request_option, options={})
stream_slice = {cursor_field: "customer"}

slicer.update_cursor(stream_slice)
assert expected_req_params == slicer.get_request_params(stream_slice)
assert expected_headers == slicer.get_request_headers()
assert expected_body_json == slicer.get_request_body_json()
assert expected_body_data == slicer.get_request_body_data()
assert expected_req_params == slicer.get_request_params(stream_slice=stream_slice)
assert expected_headers == slicer.get_request_headers(stream_slice=stream_slice)
assert expected_body_json == slicer.get_request_body_json(stream_slice=stream_slice)
assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice)


def test_request_option_before_updating_cursor():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,9 @@ def test_request_option(
],
options={},
)
slicer.update_cursor({"first_stream_id": "1234", "second_stream_id": "4567"}, None)
stream_slice = {"first_stream_id": "1234", "second_stream_id": "4567"}

assert expected_req_params == slicer.get_request_params()
assert expected_headers == slicer.get_request_headers()
assert expected_body_json == slicer.get_request_body_json()
assert expected_body_data == slicer.get_request_body_data()
assert expected_req_params == slicer.get_request_params(stream_slice=stream_slice)
assert expected_headers == slicer.get_request_headers(stream_slice=stream_slice)
assert expected_body_json == slicer.get_request_body_json(stream_slice=stream_slice)
assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice)

0 comments on commit cd1f492

Please sign in to comment.