Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream returns AirbyteMessages #18572

Merged
merged 69 commits into from
Nov 8, 2022
Merged

Stream returns AirbyteMessages #18572

merged 69 commits into from
Nov 8, 2022

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Oct 27, 2022

What

  • This is PR 1/2 for emitting requests and responses as AirbyteMessages.
  • This PR is scoped to refactoring the Stream and the Source such that the Stream returns AirbyteMessage instead of Mapping.
  • This follow up PR updates the retriever and the HttpStream so requests and responses are emitted as AirbyteLogMessages

  • Update the Stream interface to yield AirbyteMessage
  • This allows streams to yield TraceMessages natively

How

  • Move conversion logic from Mapping[str, Any] -> AirbyteRecord to a reusable function
  • Add read_records_as_messages to the Stream inteface
  • read_records_as_messages simply calls read_records` and pipes them to the reusable function
  • Update the AbstractSource to call read_records_as_messages instead of read_records
  • read_records isn't modified so child classes can still overwrite is when needed

Testing

Recommended reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
  2. airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
  3. airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

Tests

Unit

Put your unit tests output here.

Integration

Put your integration tests output here.

Acceptance

Put your acceptance tests output here.

@github-actions github-actions bot added the CDK Connector Development Kit label Oct 27, 2022
@girarda girarda changed the title method yielding airbytemessage Stream returns AirbyteMessages Oct 27, 2022
yield message
if message.type == MessageType.RECORD:
record = message.record
stream_state = stream_instance.get_updated_state(stream_state, record.data)
Copy link
Contributor Author

@girarda girarda Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same logic moved inside the if block because we only want to update the state and record counter on records

@@ -99,6 +116,7 @@ def read_records(
This method should be overridden by subclasses to read records based on the inputs
"""

@lru_cache(maxsize=None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -425,8 +425,8 @@ def test_source_config_no_transform(abstract_source, catalog):
records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})]
assert len(records) == 2 * 5
assert [r.record.data for r in records] == [{"value": 23}] * 2 * 5
assert http_stream.get_json_schema.call_count == 1
assert non_http_stream.get_json_schema.call_count == 1
assert http_stream.get_json_schema.call_count == 5
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assert verified that we used the cached transformer and schema instead of calling get_json_schema.
We now call get_json_schema more often, but the value is cached so the behavior is effectively the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense! but side question, could we add a test into core.py that calls get_json_schema() and asserts that some part of:

ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.name)

only gets invoked once. like given that it's cached, we'd only expect self.__class__ or the underlying get_schema() to only get called once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 0cbd28e

@girarda girarda temporarily deployed to more-secrets November 5, 2022 00:33 Inactive
# AirbyteRecordMessage: An AirbyteRecordMessage
# AirbyteLogMessage: A log message
# AirbyteTraceMessage: A trace message
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not attached to this typedef, but the full union definition is quite cumbersome

stream_instance = self._stream_to_instance_map[stream_name]
return stream_instance.transformer, stream_instance.get_json_schema()

def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracted to a function so it can be reused by the SimpleRetriever

@github-actions github-actions bot removed the area/connectors Connector related issues label Nov 5, 2022
Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

from airbyte_cdk.sources.utils.transform import TypeTransformer
from airbyte_cdk.sources.utils.record_helper import \
stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did a formatter change? i don't recall seeing this before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. Ran gradlew format. I'll look into what's wrong with my setup

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@girarda girarda merged commit 1a608f8 into master Nov 8, 2022
@girarda girarda deleted the alex/read_with_logs branch November 8, 2022 05:23
letiescanciano added a commit that referenced this pull request Nov 8, 2022
…nent

* master:
  🪟 🎉 Enable frontend validation for <1hr syncs (cloud) #19028
  Stream returns AirbyteMessages (#18572)
  🎉 New Source - Recruitee [low-code SDK] (#18671)
  🎉 New source: Breezometer [low-code cdk] (#18650)
  Check disabled connections after protocol update (#18990)
  Simple default replication worker refactor (#19002)
  🎉 New Source: Visma e-conomic (#18595)
  🎉 New Source: Fastbill (#18593)
  Bmoric/extract state api (#18980)
  🎉 New Source: Zapier Supported Storage (#18442)
  🎉 New source: Klarna (#18385)
  `AirbyteEstimateTraceMessage` (#18875)
  Extract source definition api (#18977)
  [low-code cdk] Allow for spec file to be defined in the yaml manifest instead of an external file (#18411)
  🐛 Source HubSpot: fix property scopes (#18624)
  Ensure that only 6-character hex values are passed to monaco editor (#18943)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants