Skip to content

Commit

Permalink
[Low-Code CDK] Construct declarative components from Pydantic models (#…
Browse files Browse the repository at this point in the history
…21050)

* initial work to parse manifest objects into pydantic models

* pr feedback and some other validations

* rerun manifest schema generation

* remove field constraint due to bug

* initial work to construct most components from greenhouse

* custom components parse subcomponent fields correctly and adding a few more component constructors

* construct components from gnews

* first pass at posthog.yaml

* Handle nested custom components with list values.
Also includes updates to posthog.yaml, including autoformatting changes.

* adding constructors for slicers, filters, and transformations and a few bug fixes

* make sed work across multiple OS

* add NoAuth component

* fix handling of custom components with nested list

* Autogenerate `TYPE_NAME_TO_MODEL` mapping

* Handle default kwargs not defined on model for custom components

* Re-add `options` for CartesianProductStreamSlicer for backwards compat
with custom stream slicers

* add basic unit tests for the model component factory

* add back defaults and extra parameters like options to retain compatibility with legacy flow and backwards compatibility

* Remove `_get_defaults`; using actual default values on classes instead

* Add backoff strategy component creation functions

* add back defaults and extra parameters like options to retain compatibility with legacy flow and backwards compatibility

* add lots of tests to construct components from the pydantic models and a few bug fixes

* add a few tests for the model to component factory

* add catch

* fix a bug where propagated schema doesn't work with old factory

* clean up a few files

* add type inference for custom components, more tests and some refactoring of the model factory

* template, docs, manifest updates, pr feedback and some cleanup

* pr feedback and polish schema a bit

* fix tests from the latest rebase of master

* fix the last few bugs I found and adjust a few sources that weren't perfectly compatible with the new component flow

* fix CheckStream bug cleanup and a few small tweaks and polish

* add additional test to cover bug case

* fix formatting

* 🤖 Bump minor version of Airbyte CDK

Co-authored-by: Catherine Noll <noll.catherine@gmail.com>
Co-authored-by: brianjlai <brianjlai@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 13, 2023
1 parent 0a71b01 commit cbf9ea7
Show file tree
Hide file tree
Showing 25 changed files with 1,923 additions and 137 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.19.1
current_version = 0.20.0
commit = False

[bumpversion:file:setup.py]
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.20.0
Low-Code: Refactor low-code to use Pydantic model based manifest parsing and component creation

## 0.19.1
Low-code: Make documentation_url in the Spec be optional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ definitions:
type: string
value:
type: string
$options:
type: object
additionalProperties: true
AddFields:
description: Transformation which adds field to an output record. The path of the added field can be nested.
type: object
Expand Down Expand Up @@ -77,6 +80,9 @@ definitions:
type: string
header:
type: string
$options:
type: object
additionalProperties: true
BasicHttpAuthenticator:
description: Authenticator for requests authenticated with a username and optional password
type: object
Expand All @@ -88,9 +94,15 @@ definitions:
type: string
enum: [BasicHttpAuthenticator]
username:
description: The username that will be combined with the password, base64 encoded and used to make requests
type: string
password:
description: The password that will be combined with the username, base64 encoded and used to make requests
type: string
default: ""
$options:
type: object
additionalProperties: true
BearerAuthenticator:
description: Authenticator for requests authenticated with a Bearer token
type: object
Expand All @@ -103,6 +115,9 @@ definitions:
enum: [BearerAuthenticator]
api_token:
type: string
$options:
type: object
additionalProperties: true
CartesianProductStreamSlicer:
description: Stream slicer that iterates over the cartesian product of input stream slicers
type: object
Expand Down Expand Up @@ -293,6 +308,23 @@ definitions:
$options:
type: object
additionalProperties: true
CustomRequestOptionsProvider:
description: (DO NOT USE) Added for backwards compatibility but this will be deprecated in a future release
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomRequestOptionsProvider]
class_name:
type: string
additionalProperties: true
$options:
type: object
additionalProperties: true
CustomRetriever:
description: Retriever component whose behavior is derived from a custom code implementation of the connector
type: object
Expand Down Expand Up @@ -409,10 +441,13 @@ definitions:
type: string
access_token_name:
type: string
default: "access_token"
expires_in_name:
type: string
default: "expires_in"
grant_type:
type: string
default: "refresh_token"
refresh_request_body:
type: object
additionalProperties: true
Expand All @@ -422,6 +457,12 @@ definitions:
type: string
token_expiry_date:
type: string
token_expiry_date_format:
description: The format of the datetime; provide it if expires_in is returned in datetime instead of seconds
type: string
$options:
type: object
additionalProperties: true
DeclarativeStream:
description: A stream whose behavior is described by a set of declarative low code components
type: object
Expand All @@ -434,15 +475,19 @@ definitions:
type: string
enum: [DeclarativeStream]
retriever:
definition: Component used to coordinate how records are extracted across stream slices and request pages
anyOf:
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
checkpoint_interval:
definition: How often the stream will checkpoint state (i.e. emit a STATE message)
type: integer
name:
definition: The stream name
type: string
default: ""
primary_key:
definition: The primary key of the stream
anyOf:
- type: string
- type: array
Expand All @@ -455,17 +500,19 @@ definitions:
type: string
default: ""
schema_loader:
definition: The schema loader used to retrieve the schema for the current stream
anyOf:
- "$ref": "#/definitions/DefaultSchemaLoader"
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
stream_cursor_field:
definition: The field of the records being read that will be used during checkpointing
anyOf:
- type: string
- type: array
items:
- type: string
transformations:
definition: A list of transformations to be applied to each output record in the
type: array
items:
anyOf:
Expand Down Expand Up @@ -531,18 +578,6 @@ definitions:
$options:
type: object
additionalProperties: true
DefaultSchemaLoader:
description: Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.
type: object
required:
- type
properties:
type:
type: string
enum: [DefaultSchemaLoader]
$options:
type: object
additionalProperties: true
DpathExtractor:
description: Record extractor that searches a decoded response over a path defined as an array of fields
type: object
Expand Down Expand Up @@ -620,7 +655,9 @@ definitions:
- POST
default: GET
request_options_provider:
"$ref": "#/definitions/InterpolatedRequestOptionsProvider"
anyOf:
- "$ref": "#/definitions/CustomRequestOptionsProvider"
- "$ref": "#/definitions/InterpolatedRequestOptionsProvider"
$options:
type: object
additionalProperties: true
Expand Down Expand Up @@ -801,8 +838,9 @@ definitions:
type: string
enum: [OffsetIncrement]
page_size:
description: The number of records to request
anyOf:
- type: number
- type: integer
- type: string
$options:
type: object
Expand All @@ -818,6 +856,7 @@ definitions:
type: string
enum: [PageIncrement]
page_size:
description: The number of records to request
type: integer
start_from_page:
type: integer
Expand Down Expand Up @@ -870,7 +909,8 @@ definitions:
type:
type: string
enum: [RecordFilter]
backoff_time_in_seconds:
condition:
description: The predicate to filter a record. Records will be removed if evaluated to False
type: string
default: ""
$options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False, con
self.logger = logging.getLogger(f"airbyte.{self.name}")

# Controls whether we build components using the manual handwritten schema and Pydantic models or the legacy flow
self.construct_using_pydantic_models = construct_using_pydantic_models
self.construct_using_pydantic_models = True

# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
manifest = dict(source_config)
Expand Down Expand Up @@ -87,7 +87,11 @@ def connection_checker(self) -> ConnectionChecker:
if "type" not in check:
check["type"] = "CheckStream"
if self.construct_using_pydantic_models:
return self._constructor.create_component(CheckStreamModel, check, dict())(source=self)
check_stream = self._constructor.create_component(CheckStreamModel, check, dict())
if isinstance(check_stream, ConnectionChecker):
return check_stream
else:
raise ValueError(f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}")
else:
return self._legacy_factory.create_component(check, dict())(source=self)

Expand Down
Loading

0 comments on commit cbf9ea7

Please sign in to comment.