-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Low-Code CDK] Parse incoming manifest objects into Pydantic models #20747
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
me gusta!
thanks for sharing an early draft. it really helps understanding the full picture.
This seems like a good approach assuming we solve the problem of the fields that need to be duplicated. Maybe worth validating early to make sure we don't need to revisit this solution
self._debug = debug | ||
self._factory = DeclarativeComponentFactory() | ||
self._factory = DeclarativeComponentFactory() # Legacy factory used to instantiate declarative components from the manifest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to _legacy_factory
if this will get merged in with the constructor to avoid confusion
raise FileNotFoundError(f"Failed to read manifest component json schema required for validation: {e}") | ||
|
||
try: | ||
validate(self._source_config, declarative_component_schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should validate using the legacy validation until we delete the code (unless we're 110% confident they're equivalent)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is that? I'm wondering because I feel like the earlier we enable this validation, the more feedback we will get to know if it behaves as we expect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant validate using both as extra validation. I don’t feel strongly enough about it to push more though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We did validate it in part 1, but its trivial to leave it in and doesn't hurt to so I'll remove the conditional here to continue both validations, but keep the rest of the conditionals to gate construction.
airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Show resolved
Hide resolved
full_config["version"] = self._source_config["version"] | ||
full_config["check"] = self._source_config["check"] | ||
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()] | ||
if len(streams) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: what should we do / how should we fail if there are no streams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. In the context of this code, we're just appending the constructed declarative streams back to the object for validation. We'll end up deleting this code in the long run though so I don't think we need to change anything here.
That being said, we should be able to update the json schema to enforce a minimum of 1 element and avoid having a specific conditional check in our code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annoyingly, it looks like there's a bug in pydantic that causes an error when we try to use field constraints on generics.
E ValueError: On field "streams" the following field constraints are set but not enforced: min_items.
E For more details see https://pydantic-docs.helpmanual.io/usage/schema/#unenforced-field-constraints
There's a bug filed here w/ the exact use case, in the meantime I'll have to get rid of the field constraint because the way the model generator works, it will always emit classes in the format that causes the error until it is fixed:
pydantic/pydantic#3358
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the way you structured this change. Good job!
airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py
Show resolved
Hide resolved
raise FileNotFoundError(f"Failed to read manifest component json schema required for validation: {e}") | ||
|
||
try: | ||
validate(self._source_config, declarative_component_schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is that? I'm wondering because I feel like the earlier we enable this validation, the more feedback we will get to know if it behaves as we expect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome @brianjlai! Just a couple questions/comments.
@@ -128,7 +157,7 @@ def _validate_source(self): | |||
if "version" in self._source_config: | |||
full_config["version"] = self._source_config["version"] | |||
full_config["check"] = self._source_config["check"] | |||
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()] | |||
streams = [self._legacy_factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be flagging this on self.construct_using_pydantic_models
so that we're validating the streams constructed by self._constructor.create_component
? Or is blocked on implementing create_declarative_stream
(in which case maybe a TODO here would be warranted?)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a no-op for renaming the variable. I don't think we need to gate the validation behind anything because in the first part of the refactor we decided to validate the schema against both the auto-generated one (which is this block of code) as well as against the YAML handwritten one. This should already be working as is, and the pydantic parsing we're doing in this PR is done after validations as we actually instantiate components.
airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Show resolved
Hide resolved
This is a good idea. Catherine and I have been working on the follow up to this PR where we actually create the declarative components and I validated that we can solve the duplicative fields as we want. For example, if we want to avoid duplicating base_urls or primary keys, between layers, at a parent layer we have access to the current model and sub-components and we can get specific fields and pass them to children via kwargs. I verified this for base_url |
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest, evaluated_manifest, "") | ||
propagated_source_config = ManifestComponentTransformer().propagate_types_and_options("", resolved_source_config, {}) | ||
self._new_source_config = propagated_source_config | ||
self._legacy_source_config = resolved_source_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the immediate, it's a little ugly to retain both the legacy manifest and the new manifest (the key difference being the new manifest goes through the transformer to propagate options and assign types). But we will be getting rid of _legacy_source_config
in the final part of the refactor anyway. Doing it this way just helps make it very explicit what lines of code will get deleted in that PR.
The final flow after we finish will look like this:
evaluated_manifest = {}
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest, evaluated_manifest, "")
propagated_source_config = ManifestComponentTransformer().propagate_types_and_options("", resolved_source_config, {})
self._source_config = propagated_source_config
self._debug = debug
self._constructor = ModelToComponentFactory()
self._validate_source()
|
||
self._validate_source() | ||
|
||
# Stopgap to protect the top-level namespace until it's validated through the schema | ||
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS] | ||
unknown_fields = [key for key in self._legacy_source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this check needed or would it be enough to validate the ConnectorManifest
with additionalProperties = false? would be nice to only define the top level fields once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep I will add it to the declarative handwritten schema. But while we are keeping the old validation flow, we should leave this block. But during cleanup we will remove this. Will update this w/ a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
note: I won't be cutting a release of this because the changes are gated anyway, I will release this as part of #21050 |
…20747) * initial work to parse manifest objects into pydantic models * pr feedback and some other validations * rerun manifest schema generation * remove field constraint due to bug * add a few tests for the model to component factory * add catch * fix a bug where propagated schema doesn't work with old factory * add additionaProperties: false to the handwritten schema * regenerate pydantic models * fix test
Closes #20045
What
Now that we have Pydantic model classes generated from the declarative_component_schema.yaml, we need to translate the incoming manifest object into those models. This is a precursor to constructing declarative components from these models (Phase 4).
Note: This is intentionally gated off and won't impact anything since we'll need to have he full object creation complete before we can release this
How
We can leverage Pydantic's existing parser to transform a Mapping into it's respective Pydantic model and subcomponent models. The method to do that is
BaseModel.parse_obj()
.Without turning these models into concrete declarative components, this code path is not usable so I've also gated it behind a field on the ManifestDeclarativeSource called
construct_using_pydantic_models
, which will remain off at all times until we are ready to release the changes.Lastly, I added some boilerplate as to what the new factory will be structured and added construction of
CheckStream
andSpec
components because they are very simple. I didn't includeDeclarativeStream
and just created a stub because that would require building out the entire set of constructors which will be in Phase 4.Recommended reading order
manifest_declarative_source.py
model_to_component_factory.py