Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pydantic support to serde #31565

Merged
merged 3 commits into from
May 26, 2023
Merged

Conversation

bolkedebruin
Copy link
Contributor

This adds support of serialization and deserialization of pydantic models. Pydantic takes precedence over dataclasses.

cc: @potiuk @mhenc @uranusjr

This adds support of serialization and deserialization
of pydantic models. Pydantic takes precedence over
dataclasses.
Comment on lines 312 to 313
"""Return True if the class is a pydantic model."""
return hasattr(cls, "validate") and hasattr(cls, "json") and hasattr(cls, "dict")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use isinstance here? Pydantic models use subclasses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isinstance is slower than checking for attributes, it will need to walk the tree of objects to find out whether it is an instance. It is one of the reasons the new serde code is faster than the old serialization code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using isinstance after the attribute checks? The problem I have with the current check is those attribute names are quite common and can easily hit false positives. If we check the attributes first, the probability of the object passed to isinstance is a Pydantic model should be high, and since Pydantic models mostly inherit from BaseModel directly, the talk should end very quickly (only two steps).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import timeit

def askisinstance(foo=object()):
return isinstance(foo, BaseModel)

def askhasattr(foo=object()):
return hasattr(foo, "validate") and hasattr(foo, "dict") and hasattr(foo, "json")

#%%
timeit.timeit('testfunc()', 'from main import askisinstance as testfunc')
timeit.timeit('testfunc()', 'from main import askhasattr as testfunc')

instance

0.19626916700508446

hasattr

0.06582462496589869

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer some distinguishing elements which is also more pythonic. Do you really think that the combination of those fields is so common? If required, I can also check for "fields" and "fields_set"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to check for different attributes set by pydantic metaclass.

@bolkedebruin bolkedebruin merged commit 85910b7 into apache:main May 26, 2023
1 check passed
@bolkedebruin bolkedebruin deleted the serde_pydantic branch May 26, 2023 09:52
@potiuk
Copy link
Member

potiuk commented May 26, 2023

Comment/question:

Just checking what was the intention here, because it won't (yet) make it usable for AIP-44. Maybe it's not complete solution yet, but the idea we had was that in some cases when we want to get (say) DagRun class, it will get serialized as DAGRunPydantic (by running fromOrm(dagRun) automatically). so in this case we are not trying to serialize PyDantic object, but we are trying to serialize ORM object as a Pydantic version of it.

So in our case in a number of cases we will have:

  1. DagRun -> serialize-> DagRunPydantic(serialized) -> deserialize -> DagRunPydantic.

and in some other (less important for us) cases:

  1. DagRunPydantic -> serialize-> DagRunPydantic(serialized) -> deserialize -> DagRunPydantic.

So if I understand correctly with this case we really handle the "deserialize" part in 1. and whole case 2. - we still will need to add the part of serde that properly serializes the DagRun (and few other ORM classes).

Is that the right understanding :)?

@bolkedebruin
Copy link
Contributor Author

Replying from my phone. Expect some brevity.

A couple of thoughts on from side.

The change here allows arbitrary pydantic objects to be serialized contrary the current implementation in serialized_objects. So it covers everything in the above examples from DagRunPydantic onwards.

Furthermore I think the approach to get e.g. dagrunpydantic from a dagrun is somewhat flawed for several reasons.

  1. It's a lot of code duplication which needs to be maintained and there is need for it (see below)
  2. due to 1 the schema is weak and needs to be maintained manually. This defeats the purpose of using pydantic
  3. There is no versioning
  4. Using two classes (dagrun and DagRunPydantic) requires two instance checks more or less everywhere it seems (DagRun | DagRunPydantic) while there seems no real reason to do so.

Most of the issues can be overcome by inspecting the sqlalchemy model and deriving the pydantic model from that. There is even a plugin or that (pydantic-sqlalchemy, I think). Having that incorporated into the base DagRun class (and the others) solves all above issues improving maintainability and robustness and it removes the need for all those custom pydantic classes which basically are yet again another way of serialization.

@potiuk
Copy link
Member

potiuk commented May 29, 2023

Using two classes (dagrun and DagRunPydantic) requires two instance checks more or less everywhere it seems (DagRun | DagRunPydantic) while there seems no real reason to do so.

TL;DR; I tried different approaches (also pydantic-sqlalchemy) and I could not find a better solution for the needs of Internal DB-API. While there is some duplication, I believe it is necessary to keep the DB/NoDB dualism as is a result of earlier Airlfow architectural decisions that not only extended some of our DB models, but also exposed them to DAG Authors as part of Airflow's public interface.

Apologies for the length of it. There are good reasons to keep the dualism. I spend quite some time not only thinking about it but evaluating and trying different solutions and I am out of ideas how to do it better. If someone could actually do it - based on the below understanding of the problems we have with the DB-less mode, I would appreciate. But I am afraid "just use pydantic-sqlalchemy" or just "discover fields" is just scratching the surface of the problem and when you actually try to do it (I actually tried both and failed). Also I am a bit less available in the coming week, so maybe that will give time for others to digest it (if they have the courage to read id) and think of another solution.

So please do not treat it is as "preaching" @bolkedebruin - I understand it might "seem" as simple as serializing/deserializing fields of ORM objects, but (due to Airlfow earler decisions made long time ago and backwards-compatibility needs), I believe this is just a first step and it does not solve the problem of DB-less mode. So by writing the below I am really asking for help to try to find out a better solution that I came up with. Because I tried and failed. But maybe others can do it better. So if anything, this explanation is more of a "cry for help" - I am desperate to make it easier and more automated, but I failed to find a way so far and if anything, I need someone who comes up with idea how to do it and proves the idea by a POC based on the below.

What's the problem with pydantic-sqlalchemy and similar manual field synchronisation

The Dualism of DagRun | DagRunPydantic was very deliberate and a way to build of set of common fields and methods, that should be considered as "public interface" of those, especially when passed by context to the DAG Author. I am happy if is done differently than I proposed, so I would love to hear/see a solution to this (but for now I think pydantic-sqlalchemy approach does not solve it - even if pydantic-sqlalchemy was maintained.

Of course - I looked at pydantic-sqlalchemy at my orm <> noorm journey, when attempted to solve the problem (see my detailed explanation here where I answered to the same question asked by @pierrejeambrun #29776 (comment) in case you have not seen it)

But I am happy to repeat it agan and even expand it a bit more after what I've learned by not only looking at the code but also making the changes so far (and some of that already successfully applied with BaseJob refactoring):

First thing IMHO, we should not use pydantic-sqlalchemy because it just scratches the surface of what is needed:

  1. last commit and release there were >2 years ago (though the code there is very few lines - indeed it uses inspection and code is rather straightforward)
  2. it is deliberately mentioned as experimental by the author

But those two are not as important as the two below:

  1. it does not care about relationships between the objects and you have to write a custom code to rebuild relationships between then objects.
  2. it only cares about fields in the DB, not the methods in the DB objects of ours which IMHO need to be kept due to backwards compaatibility - including some DB calls (see later examples)

The points 3) and 4) specifically caused that I tried and rejected it as much more complex if we try to automate it rather than repeat the declaration of a handful of classes in their Pydantic form (and my personal attempt to automatically convert the objects I did suffered from the same problem).

Re 3) - lack of automated relationships:

See the example in the Readme, where you have to manually create special "UserWithAddress" class when you want to represent the relationships: . In our case we would have to manually recreate TaskInstanceWithDagRunWithConsumedDatasetEventsDatasetsConsumingProducingTasksAndOutletsLinkedByReferences (this is not complete) if we were to serialize TaskInstance object. Because yes TaskInstance has relation to DagRun, and DagRun has relation to Datasets it consumes.

Why do we need to recreate those? Because currently and DAG developer can do (not really useful and not exactly the syntax - the events and tasks should be iterated, but this should explain the point):

context.ti.dag_run.consumed_dataset_events[0].dataset.producing_tasks[0].is_orphaned

This works now, and should continue to work after the change in db-less mode. We not only use those relationships internally, but also we deliberately pass TI and DagRun in the context, and accessing those objects is the only way to get information about datasets.

If you use sqlalchemy-pydantic, you need to manually recreate those relations. This means that you are not only equally prone to maintenance issues, but also you have no way to verify that any future version works before and after Pydantic serialization - because you might add another class there, and be absolutely no aware that you forgot to add relationship there. Of course one could attempt to automatically discover that DagRun object has the relationship to Datasets - but there are a number of ways this can be done and you you need to scan all other models and build such relationships (back-references). It's complex, prone to error. And this case is fully handled by one, line in the DAGRunPydantic that was written "by hand". The TaskInstancePydantic.from_orm(TaskInstance) just works with it (we even have a tests confirming it) test_serializing_pydantic_dataset_event.

    consumed_dataset_events: List[DatasetEventPydantic]

Re 4) lack of method synchronization in the "automated" objects.

The original assumptions that we will always have a DB access from worker has taken a toll here. We made the choice (simlarly as in case of BaseJob before) that we added custom methods to the ORM classes - and then exposed those ORM classess to the DAG authors. This was not a strange thing to do when we always had DB access from the the local workers, but now part of the logic for task instances, dag run, datasets lives inside the ORM classes and there is no automatically convert the logic when automatically converting the ORM classes. Those are few classes that suffer from that - DagRun and TaskInstance particularly. But the problem is that our DAG Authors already know and use those methods and we need to keep those methods. Otherwise MANY dags will start breaking out there.

For example the DAG Author can currently do:

context.ti.get_num_running_task_instances()

This method is actually a DB method that runs a query dynamically when it is called. How would you make sure that "automatically" converted TaskInstancePydantic works the same way when passed in context? I would love to hear a solution to that (without introducing some kind of dualism, duplicating the code or applying special cases there.

There are quite a few similar methods (get_previous_dagrun, get_previous_execution_date or get_current_state anf king of all of them get_template_context - which retrieves a LOT of information including dataset events, outlets., inlets etc. number of others). DAG authors use them and they are even encouraged to do so. This is all part of our "public_interface" officially. See for example this description of https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html#airflow.models.taskinstance.TaskInstance.current_state

Which is linked from https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html which is now canonical place which defines what can be used by DAG Authors.

How current solution attempts to solve it

The dualism of ORM/Pydantic classes is one of the ways to solve it. It also utilises MyPy to both find out what still needs to be done and to inform the user what they can use (TaskInstancePydantic should have a complete set of public interface that is usable by the DAGAuthor when completed - parallel to TaskInstance, thus making TaskInstance | TaskInstancePydantic a useful union to have.

What we plan to do, is to make sure that all such methods that can be used by dag author are explicitly defined in TaskInstancePydantic (and other classes) and they are proxies to the "real" methods. This has already been succesfully applied to a number of methods in Job. We had the same problem there, and I solved it by splitting the Job into ORM Job/JobPydantic that only contains fields and moving the methods out of `Job' ORM objects when they were there initially.

Unfortunately we cannot "remove" those methods from TaskInstance - they need to stay there because our users use them. We need to find a way how you can have a Pydantic class where when some methods are called will actually reach out to the DB of ours and run the query. So what we will need to do is similarly to BaseJob, move code of those methods out of the class and run a proxy call - either via internal API call (For TaskInstancePydantic) or directly (for TaskInstance).

Also - there is a very nice property of having "TaskInstance | TaskInstancePydantic" dualism actually. They implicitly make the "db-less" mode as future-proof as possible. Having MyPy checks via TaskInstance | TaslInstancePydantic is a great way to make sure that we do not "break" db-less mode - it will make sure that we will not add something that is not properly handled by the "db-less" mode. If someone adds a new table or new public DB method to TaskInstance as reference, it will run fine in DB-less mode, but if you don't add it to TaskInstancePydantic, it will fail MyPy. so you need to make sure TaskInstancePydantic in this case will make a proxy call over internal API. This "future-proofness" is actually improving maintenance of the solution, not decreasing it. In this case having duplication is not a problem. but forgetting that you should also handle a new DB call in local task job is. So in this case the need to have duplication is to our advantage actually, because author of such a new method will have to also think what to do in case of "db-less" mode.

But maybe there is another way?

I would absolutely love to have other solution that would be less complex and have less duplication - but I have no better idea how to do it. Maybe we can find one together?

If we can have a solution for both 3) and 4) without that duplication (and with future-proof approach), I am more than happy to use it and I am all ears. But for now I cannot see one. But maybe you or someone else could do a POC of doing it differently and showing that it can be done simpler without the duplication. I tried several things (including pydantic-sqlalchemy) but could not find a different solution (even if there is a degree of duplication there).

@bolkedebruin
Copy link
Contributor Author

@potiuk thanks for the explanation, that wasn't clear from the implementation. Maybe it is time for a "For Developers" section / persona in the Airflow documentation.

Couple of notes/questions. It might require some discussion (mail/talk) to reach common understanding:

  1. On the implementation side I have a hard time following what you are describing. The pydantic models in serialization are straight copies from the ORM counterparts. Are you referring to future implementations?
  2. I don't think pydantic-sqlalchemy as is was suitable for Airflow, but I think it could be extended or re-used so that it would work for us. For example, the relationship challenge could probably easily be solved by using a proxy and can be detected automatically.
  3. On your setup.py download URL #4 to me this seems a quite common thing that happens in RPC context. The context.ti.get_num_running_task_instances() just calls the Rest API instead of the ORM API to obtain what is required. Maybe I am looking at this with a too simple view?
  4. I would have loved that we took the chance when moving to a Managed API that we would drop certain things that really shouldn't be available. So slowly moving away from the ORM model to the API model.
  5. In Add Pydantic-powered ORM models serialization for internal API. #29776 I would have preferred not using the json method from pydantic, but to use the dict method. Which allows us freedom of choice down the line and use a different encoder (JSON or whatever) in the future. Basically, any serialization should go through serde in my opinion as it does the right thing (TM ;-), like versioning and including class information) and if it doesn't there is a centralized place where we can fix it. The choice for pydantic in Add Pydantic-powered ORM models serialization for internal API. #29776 was due to @dataclass or @attr not serializing classes en related entities. But that is exactly what serde does for you. The only advantage pydantic currently has, is that it does this recursively and it keeps the structure which @attr and @dataclass don't do (that is a limitation of those classes not of serde). However, the current implementation of the ORMless models does not require that.
  6. Your mypy reference makes sense. But can't this be solved as well with inheritance? E.g. have a ORM implementation and API implementation that both inherit from an common ancestor?
  7. In case of code duplication, why not use generation at compile time instead of runtime if the above cannot be done? From the ORM models we can define what is required and generate the pydantic models at compile time. BTW: the way pydantic models seem to be used now are closely related to what protobuf does imho.

What it boils down to is that I do think that your 3 and 4 can be solved by using (generated) proxies that call a Rest API. I would be surprised if you didnt look at that yet so what am I missing?

@potiuk
Copy link
Member

potiuk commented Jun 1, 2023

  1. On the implementation side I have a hard time following what you are describing. The pydantic models in serialization are straight copies from the ORM counterparts. Are you referring to future implementations?

Yes. The idea is to add methods that could be called when the Pydantic model is used on the client side. The assumption here is that we want to have non-ORM models. that will behave same ways as the DB models when we call their methods and those (public) methods should implement the same behaviour as their ORM counter-parts.

  1. I don't think pydantic-sqlalchemy as is was suitable for Airflow, but I think it could be extended or re-used so that it would work for us. For example, the relationship challenge could probably easily be solved by using a proxy and can be detected automatically.

It would be rather complex and brittle as I see. Especially that some of those methods we are going to call will be straight using fields we already have in the pydantic class and some will have to make a remote call. I tried dynamically generatic "non-ORM" classes that would do that, but that gets complex very quickly, and so far my assesment is that explicit copying of the method invocation in the Pydantic version of the class is simply simpler. And due to the public nature and small-ish number of those - except having to extract common code to call an having to proxy the call to those common implementations, the maintenance overhead is very low. We aren't going to change the signatures of those methods (they are public same as fields) and the duplication will limit to declaration of the fields and methods.

The assumption is here that we should not create the SQLAlchemy DB models when we have no database (this will happen in the client mode). Instead the serialization will return a Pydantic version of those and we need to bring mostly the same interface there.

So from my earlier trials, being explicit and duplicating those is the simplest and most straightforward ways of doing it.

  1. On your setup.py download URL #4 to me this seems a quite common thing that happens in RPC context. The context.ti.get_num_running_task_instances() just calls the Rest API instead of the ORM API to obtain what is required. Maybe I am looking at this with a too simple view?

Yes - in many cases it will be lilke that, but in some other cases we will just use the fields we already have. A number of those methods do not require the RPC call at all, they can just provide the same result by passing the TaskInstance | TaskInstancePydantic object to it. Simple example: email_alert(self, exception, task: BaseOperator) will not need to do RPC call to send email. So while we could potentially make "any" call to the TaskPydantic method execute a remote call, the idea is to do it only when necessary.

  1. I would have loved that we took the chance when moving to a Managed API that we would drop certain things that really shouldn't be available. So slowly moving away from the ORM model to the API model.

Me too. I would absolutely love it. But that - unfortunately means heavily braking changes for those few models that we decided to expose I am afraid.

  1. In Add Pydantic-powered ORM models serialization for internal API. #29776 I would have preferred not using the json method from pydantic, but to use the dict method. Which allows us freedom of choice down the line and use a different encoder (JSON or whatever) in the future. Basically, any serialization should go through serde in my opinion as it does the right thing (TM ;-), like versioning and including class information) and if it doesn't there is a centralized place where we can fix it. The choice for pydantic in Add Pydantic-powered ORM models serialization for internal API. #29776 was due to @dataclass or @attr not serializing classes en related entities. But that is exactly what serde does for you. The only advantage pydantic currently has, is that it does this recursively and it keeps the structure which @attr and @dataclass don't do (that is a limitation of those classes not of serde). However, the current implementation of the ORMless models does not require that.

Yes. From serialization point of view this is fine - we can send the model's data. And this is fine, I have no problem with how it is implemented now, it's just we need to somehow tell the serialized object on the client side to expose most of the same methods that the ORM objects expose, and to the right proxying in those cases. For now I think explicitly having *Pydantic object and explicitly implemented methods that will do the proxying will be far easier to maintain than trying to take the ORM model and dynamically generate a corresponding class on the client side that will do the proxying.

I believe if we try to do it automatically, we can have complex, dynamic code to maintain that will be difficult to understand and follow. Compare it with "straight" proxying the methods with some signature and field duplication. I believe the latter is much simpler to maintain in the long term, especially when helped with MyPy static checks and Task | TaskPydantic union used in places where both can be used.

  1. Your mypy reference makes sense. But can't this be solved as well with inheritance? E.g. have a ORM implementation and API implementation that both inherit from an common ancestor?

I think this is a bit more pythonic way to do the inheritance - if we have common ancestor to SqlAlchemy model - the duplication goes there. You will have to have ORM fields defined in the model and same fields declared in the "ancestor". IMHO the TaskInstance | TaskInstancePydantic is a form of that. We can even introduce a new Type "SerializableTaskInstance = TaskInstance| TaskInstancePydantic` and use it instead. You also mentiond In many cases when TaskInstance is used (in core) there is absolutlely no need to use TaskInstance | TaskInstacePydantic. This will only be needed for methods that LocalJobRunner uses them (so in case when the client potentially has ORM/non-ORM variant). Which is also a nice indication that this code can run in both contexts.

I'd love to do it differently as well but I do not know how.

  1. In case of code duplication, why not use generation at compile time instead of runtime if the above cannot be done? From the ORM models we can define what is required and generate the pydantic models at compile time. BTW: the way pydantic models seem to be used now are closely related to what protobuf does imho.

What it boils down to is that I do think that your 3 and 4 can be solved by using (generated) proxies that call a Rest API. I would be surprised if you didnt look at that yet so what am I missing?

Yes. I thought (and tried) that but I could not see a simple way of doing it. My attempts to do so end up in a way that will be difficult to follow by anyone, as opposed to having the two types of objects (TaskInstance ORM, TaskInstacePydantic - non-ORM) explicitly defined (even with some duplication and manual proxying of existing public methods).

But yes, maybe I am wrong, maybe it's a misjudgmenet. I would love to see a POC, simple variant of that to compare if we think it can be done simpler.

@eladkal eladkal added this to the Airflow 2.7.0 milestone Jun 8, 2023
@ephraimbuddy ephraimbuddy added the type:improvement Changelog: Improvements label Jul 6, 2023
Copy link

@ribeiromarilene ribeiromarilene left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ich gebe eine Bewertung 5 Weill die ganze arbeiten mit mir und Geduld und besonders die Unterstützung von der Personal ❤️

@@ -156,7 +156,7 @@ def serialize(o: object, depth: int = 0) -> U | None:

# pydantic models are recursive
if _is_pydantic(cls):
data = o.dict() # type: ignore[call-overload]
data = o.dict() # type: ignore[attr-defined]

This comment was marked as spam.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants