-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP44 Fix DAG serialization #34042
AIP44 Fix DAG serialization #34042
Conversation
@@ -438,7 +438,7 @@ def serialize( | |||
json_pod = PodGenerator.serialize_pod(var) | |||
return cls._encode(json_pod, type_=DAT.POD) | |||
elif isinstance(var, DAG): | |||
return SerializedDAG.serialize_dag(var) | |||
return cls._encode(SerializedDAG.serialize_dag(var), type_=DAT.DAG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to encode this, we expect it to have a top level "dag" unique key as shown in
"dag": { |
Similarly, we don't encode tasks or each operator, because we know the structure that it would be within ["dag"]["tasks"]
Check L445 and L447
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When building DAG s10n it was designed to using Airflow internal knowledge to not inflate the final blob and optimize wherever possible. This is the reason we don't store defaults and objects that are None for DAG and operator objects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, be careful about backwards-incompatible changes for this.
And if for AIP-44 we absolutely need this sort of change, that will probably be more involved than just the DAG object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but:
if I serialize dag (with BaseSerialization.serialize) then try to deserialize it with BaseSerialization.deserialize it fails - as it doesn't get there
https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L551
because
var = encoded_var[Encoding.VAR]
type_ = encoded_var[Encoding.TYPE]
failes (key not exists).
- also we may need to send the DAG as part of bigger structure (e.g. https://github.com/apache/airflow/blob/main/airflow/dag_processing/processor.py#L882)
In code I am not able to find any usages of BaseSerialization.serialize on DAG (at least when looking for something like BaseSerialization.serialize.*dag
We are currently working on making sure all other objects types are serializable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I think @mhenc is right - this part of code seems to be not used before - it's not DAG
serialization that gets afffected here, it's just serializing the whole dag
as part of bigger structure. I have not seen any place in the code where we'd do that before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id: int | ||
id: Optional[int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field should not be None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought that, but it is None during creation (before submission to DB) - it is assigned by SQLAlchemy after you run
session.add(obj)
session.commit()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But when do we ever serialize an unsaved DagRun?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point :)
We may need it for TriggerDag Operator - but this is outside of this PR.
So currently it's only for tests (but I can fix them) and also for consitency - some *Pydantic objects will have id as Optional (e.g. Job #34026) so we may keep it Optional there for consistency
But if you belive it's better to leave it as "int" then I can revert this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not sure Job should have an optional ID either. Maybe make this not optional for now and we can punt this until it’s nocessary for a model. It’s always easier to remove a restriction than add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually Job must have it as in
#34026
we call "add_job" which is actual the case where Id is empty.
Removing it from DagRunPydantic.
29d7495
to
8635849
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8635849
to
3285ccd
Compare
d0e1f74
to
0d95d5a
Compare
2 tasks failing, |
e037603
to
2de0d75
Compare
Is the |
957cce5
to
5635bee
Compare
Yes, by accident after conflict resolution. Removed. |
@kaxil ? |
@kaxil 🙏 |
oof, looking now |
5635bee
to
2bf360e
Compare
DAG serialization using BaseSerialization doesn't encode it with '__type'/'__var' fields, making it not-deserializable (no information about the type).
This object is needed by Internal API to be fully serializable/deserializable.
Also added more tests for BaseSerialization.serialize/deserialize methods to cover all supported object types (and fixed few minor isssues).
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.