Skip to content

Commit

Permalink
[low code connectors] perform schema validation of the input config a…
Browse files Browse the repository at this point in the history
…gainst the declarative language schema (#15543)

* draft: first pass at complete schema language generation and factory validator

* actually a working validator and fixes to the schema that went uncaught

* remove extra spike file

* fix formatting file

* pr feedback and a little bit of refactoring

* fix some types that were erroneously marked as invalid schema

* some comments

* add jsonschemamixin to interfaces

* update changelog

* bump version
  • Loading branch information
brianjlai authored Aug 18, 2022
1 parent 1a641f6 commit ca80d37
Show file tree
Hide file tree
Showing 31 changed files with 670 additions and 93 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.77
- Add schema validation for declarative YAML connector configs

## 0.1.76
- Bugfix: Correctly set parent slice stream for sub-resource streams

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass

from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class DeclarativeAuthenticator(JsonSchemaMixin):
"""
Interface used to associate which authenticators can be used as part of the declarative framework
"""


@dataclass
class NoAuth(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
@property
def auth_header(self) -> str:
return ""

@property
def token(self) -> str:
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from typing import Any, List, Mapping, Optional, Union

import pendulum
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixin):
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
Expand All @@ -40,7 +41,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixi
options: InitVar[Mapping[str, Any]]
scopes: Optional[List[str]] = None
token_expiry_date: Optional[Union[InterpolatedString, str]] = None
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False)
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False, default=None)
access_token_name: Union[InterpolatedString, str] = "access_token"
expires_in_name: Union[InterpolatedString, str] = "expires_in"
refresh_request_body: Optional[Mapping[str, Any]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
ApiKeyAuth sets a request header on the HTTP requests sent.
Expand Down Expand Up @@ -51,7 +52,7 @@ def token(self) -> str:


@dataclass
class BearerAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class BearerAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Authenticator that sets the Authorization header on the HTTP requests sent.
Expand Down Expand Up @@ -81,7 +82,7 @@ def token(self) -> str:


@dataclass
class BasicHttpAuthenticator(AbstractHeaderAuthenticator):
class BasicHttpAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64
https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
config: Config
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False)
_name: str = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False)
stream_cursor_field: Optional[List[str]] = None
_primary_key: str = field(init=False, repr=False, default="")
stream_cursor_field: Optional[Union[List[str], str]] = None
transformations: List[RecordTransformation] = None
checkpoint_interval: Optional[int] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Union

import requests
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class Decoder(ABC):
class Decoder(JsonSchemaMixin):
"""
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class JsonDecoder(Decoder):
class JsonDecoder(Decoder, JsonSchemaMixin):
"""
Decoder strategy that returns the json-encoded content of a response, if any.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class HttpSelector(ABC):
class HttpSelector(JsonSchemaMixin):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
records based on a heuristic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import List

import requests
from airbyte_cdk.sources.declarative.types import Record
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class RecordExtractor(ABC):
class RecordExtractor(JsonSchemaMixin):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
Expand All @@ -20,7 +20,7 @@ class RecordFilter(JsonSchemaMixin):
"""

options: InitVar[Mapping[str, Any]]
config: Config = field(default=dict)
config: Config
condition: str = ""

def __post_init__(self, options: Mapping[str, Any]):
Expand Down
101 changes: 87 additions & 14 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
import copy
import enum
import importlib
import inspect
import typing
from dataclasses import fields
from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints

from airbyte_cdk.sources.declarative.create_partial import OPTIONS_STR, create
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY
from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate

ComponentDefinition: Union[Literal, Mapping, List]

Expand Down Expand Up @@ -99,13 +104,14 @@ class DeclarativeComponentFactory:
def __init__(self):
self._interpolator = JinjaInterpolation()

def create_component(self, component_definition: ComponentDefinition, config: Config):
def create_component(self, component_definition: ComponentDefinition, config: Config, instantiate: bool = True):
"""
Create a component defined by `component_definition`.
This method will also traverse and instantiate its subcomponents if needed.
:param component_definition: The definition of the object to create.
:param config: Connector's config
:param instantiate: The factory should create the component when True or instead perform schema validation when False
:return: The object to create
"""
kwargs = copy.deepcopy(component_definition)
Expand All @@ -115,20 +121,47 @@ def create_component(self, component_definition: ComponentDefinition, config: Co
class_name = CLASS_TYPES_REGISTRY[kwargs.pop("type")]
else:
raise ValueError(f"Failed to create component because it has no class_name or type. Definition: {component_definition}")
return self.build(class_name, config, **kwargs)

def build(self, class_or_class_name: Union[str, Type], config, **kwargs):
# Because configs are sometimes stored on a component a parent definition, we should remove it and rely on the config
# that is passed down through the factory instead
kwargs.pop("config", None)
return self.build(
class_name,
config,
instantiate,
**kwargs,
)

def build(self, class_or_class_name: Union[str, Type], config, instantiate: bool = True, **kwargs):
if isinstance(class_or_class_name, str):
class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name)
else:
class_ = class_or_class_name

# create components in options before propagating them
if OPTIONS_STR in kwargs:
kwargs[OPTIONS_STR] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs[OPTIONS_STR].items()}
kwargs[OPTIONS_STR] = {
k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs[OPTIONS_STR].items()
}

updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}

if instantiate:
return create(class_, config=config, **updated_kwargs)
else:
# Because the component's data fields definitions use interfaces, we need to resolve the underlying types into the
# concrete classes that implement the interface before generating the schema
class_copy = copy.deepcopy(class_)
DeclarativeComponentFactory._transform_interface_to_union(class_copy)
schema = class_copy.json_schema()

updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()}
return create(class_, config=config, **updated_kwargs)
component_definition = {
**updated_kwargs,
**{k: v for k, v in updated_kwargs.get(OPTIONS_STR, {}).items() if k not in updated_kwargs},
"config": config,
}
validate(component_definition, schema)
return lambda: component_definition

@staticmethod
def _get_class_from_fully_qualified_class_name(class_name: str):
Expand All @@ -141,7 +174,7 @@ def _get_class_from_fully_qualified_class_name(class_name: str):
def _merge_dicts(d1, d2):
return {**d1, **d2}

def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
def _create_subcomponent(self, key, definition, kwargs, config, parent_class, instantiate: bool = True):
"""
There are 5 ways to define a component.
1. dict with "class_name" field -> create an object of type "class_name"
Expand All @@ -153,14 +186,14 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
if self.is_object_definition_with_class_name(definition):
# propagate kwargs to inner objects
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif self.is_object_definition_with_type(definition):
# If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
object_type = definition.pop("type")
class_name = CLASS_TYPES_REGISTRY[object_type]
definition["class_name"] = class_name
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif isinstance(definition, dict):
# Try to infer object type
expected_type = self.get_default_type(key, parent_class)
Expand All @@ -169,17 +202,22 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
if expected_type and not self._is_builtin_type(expected_type):
definition["class_name"] = expected_type
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
else:
return definition
elif isinstance(definition, list):
return [
self._create_subcomponent(
key, sub, self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)), config, parent_class
key,
sub,
self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)),
config,
parent_class,
instantiate,
)
for sub in definition
]
else:
elif instantiate:
expected_type = self.get_default_type(key, parent_class)
if expected_type and not isinstance(definition, expected_type):
# call __init__(definition) if definition is not a dict and is not of the expected type
Expand All @@ -193,8 +231,7 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
return expected_type(definition, options=options)
except Exception as e:
raise Exception(f"failed to instantiate type {expected_type}. {e}")
else:
return definition
return definition

@staticmethod
def is_object_definition_with_class_name(definition):
Expand Down Expand Up @@ -238,3 +275,39 @@ def _is_builtin_type(cls) -> bool:
if not cls:
return False
return cls.__module__ == "builtins"

@staticmethod
def _transform_interface_to_union(expand_class: type):
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
return expand_class

@staticmethod
def unpack(field_type: type):
"""
Recursive function that takes in a field type and unpacks the underlying fields (if it is a generic) or
returns the field type if it is not in a generic container
:param field_type: The current set of field types to unpack
:return: A list of unpacked types
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# Functions as the base case since the origin is none for non-typing classes. If it is an interface then we derive
# and return the union of its subclasses or return the original type if it is a concrete class or a primitive type
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin):
subclasses = field_type.__subclasses__()
if subclasses:
return Union[tuple(subclasses)]
return field_type
elif generic_type is list or generic_type is Union:
unpacked_types = [DeclarativeComponentFactory.unpack(underlying_type) for underlying_type in typing.get_args(field_type)]
if generic_type is list:
# For lists we extract the underlying list type and attempt to unpack it again since it could be another container
return List[Union[tuple(unpacked_types)]]
elif generic_type is Union:
# For Unions (and Options which evaluate into a Union of types and NoneType) we unpack the underlying type since it could
# be another container
return Union[tuple(unpacked_types)]
return field_type
Loading

0 comments on commit ca80d37

Please sign in to comment.