Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent large objects from being stored in RenderedTaskInstanceFields #28199

Closed
1 of 2 tasks
PatrickfBraz opened this issue Dec 7, 2022 · 11 comments · Fixed by #38094
Closed
1 of 2 tasks

Prevent large objects from being stored in RenderedTaskInstanceFields #28199

PatrickfBraz opened this issue Dec 7, 2022 · 11 comments · Fixed by #38094
Assignees
Labels

Comments

@PatrickfBraz
Copy link

PatrickfBraz commented Dec 7, 2022

Apache Airflow version

Airflow v2.4.1

What happened

In order to provide greater flexibility and ease for the implementation of DAGs and Tasks in our Airflow instance, we decided to implement our custom backend for XCom. In this way, we save in the database only the reference to objects that are serialized in pickle and saved in Google Cloud Storage (GCS).

All the recommendations found in this documentation were followed, including the implementation of the orm_deserialize_value method to create a short and lightweight representation of the objects saved in the GCS.

The custom backend implemented works perfectly and has been in production for a few months. Along with this, there has recently been a strong push on the team to implement the new DAGs using the TaskFlow API and slowly refactor the old DAGs. However, during the implementation of a new DAG which works with DataFrames from the pandas library we noticed that the execution presented errors not in the Task that generated the DataFrame but in the Tasks that consumed it and during the debug procedure we discovered that the problems were being caused because the DataFrames were being saved (or just trying to) into the rendered_task_instance_fields table.

We believed that only arguments provided as a template were actually rendered and saved in this table, but apparently TaskFlow shares information between Tasks through templates (I don't know exactly how it works). Also, one would expect, as with the tab that renders the XComs, that the orm_deserialize_value method would be called, but that doesn't seem to be the case.

Example os a sample code:

import pendulum
import pandas as pd
from airflow.decorators import dag, task

now = pendulum.now()


@dag(start_date=now, schedule="@daily", catchup=False)
def sample():
    @task()
    def get_dataframe():
        return pd.read_csv("https://gist.githubusercontent.com/chriddyp/feaa84b34854e53fb72a/raw/dbba00aeafb981f0f50014030d1b6ad0399d957d/example-data.csv")

    @task()
    def load_dataframe(df: pd.DataFrame):
        print(df.head())

    load_dataframe(get_dataframe())


sample()

There is no problem executing the DAG:

image

The following XCom information is rendered on UI:

image

Checking the XCom table on the local database used by airflow:

image

Everything ok so far. However, when checking the rendered template tab:

image

Confirming if this was really the object saved in the database:

image

What you think should happen instead

I expected that even if the object is deserialized, the method that returns the lightweight representation of the saved object is called. The created representation is enough for DEBUG purposes and doesn't burden the database with bulky objects.

How to reproduce

  1. Implement a new custom backend that accepts pandas DataFrame. Here you can find an example. NOTE: don't forget to implement the orm_deserialize_value to create a lighter representation of the Dataframe.
  2. Reference your new backend on the core config. Ex: AIRFLOW__CORE__XCOM_BACKEND=path.to.custom.XComBackend
  3. Set up the local environment with the docker-compose file: https://airflow.apache.org/docs/apache-airflow/2.5.0/docker-compose.yaml
  4. Execute the sample code above.
  5. Look at the Xcoms and rendered template tabs.

Operating System

NAME="Ubuntu" VERSION="20.04.5 LTS (Focal Fossa)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 20.04.5 LTS" VERSION_ID="20.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" VERSION_CODENAME=focal UBUNTU_CODENAME=focal

Versions of Apache Airflow Providers

In the production environment, we are using:

apache-airflow-providers-postgres>=4.0.0
apache-airflow-providers-apache-beam>=4.0.0
apache-airflow-providers-cncf-kubernetes>=4.1.0
apache-airflow-providers-datadog>=3.0.0
apache-airflow-providers-google>=8.0.0
apache-airflow-providers-http>=3.0.0
apache-airflow-providers-microsoft-mssql>=3.0.0
apache-airflow-providers-mongo>=3.0.0
apache-airflow-providers-mysql>=3.0.0
apache-airflow-providers-odbc>=3.0.0
apache-airflow-providers-sftp>=3.0.0
apache-airflow-providers-ssh>=3.0.0
apache-airflow-providers-airbyte>=3.0.0
apache-airflow-upgrade-check==1.4.0

Deployment

Other 3rd-party Helm chart

Deployment details

We manage a fork of the official Airflow chart which we customize for our use case. Deployment is done on a v1.21 Kubernetes cluster hosted on Google Kubernetes Engine (GKE)

Anything else

This occurrence is not a critical problem since using the Operators in the classic way already allows us to use the custom XCom without problems. However, TaskFlow presents a much more readable and friendly way of producing code. Since we want to democratize and facilitate the access and implementation of DAGs between different teams in the same Airflow instance, using TaskFlow will be of great use to us.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@PatrickfBraz PatrickfBraz added area:core kind:bug This is a clearly a bug labels Dec 7, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 7, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk potiuk added this to the Airflow 2.5.1 milestone Dec 7, 2022
@uranusjr
Copy link
Member

uranusjr commented Dec 8, 2022

Hmm, I’m not sure how it’d be actionable. Perhaps we need to add more hooks to the XCom backend interface for this.

@uranusjr uranusjr modified the milestones: Airflow 2.5.1, Airflow 2.6.0 Dec 8, 2022
@uranusjr uranusjr added kind:feature Feature Requests and removed kind:bug This is a clearly a bug labels Dec 8, 2022
@uranusjr uranusjr changed the title Large objects managed by custom XCom backend are being rendered and saved in rendered_task_instance_fields table Prevent large objects from being stored in RenderedTaskInstanceFields Dec 8, 2022
@potiuk
Copy link
Member

potiuk commented Dec 8, 2022

Hmm, I’m not sure how it’d be actionable. Perhaps we need to add more hooks to the XCom backend interface for this.

Maybe we should simply do not disply args * if they come from XCom and we have a custom backend? or use orm_deserialize in this case to display it? I guess it is possible to determine where the op_args/kwargs come from ?

@uranusjr
Copy link
Member

uranusjr commented Dec 8, 2022

Using orm_deserialize is mostly what I’m thinking, except if it’s not viable we could add a separate interface (I think we need some kind of backward compat measure so a new function may be needed anyway). Not displaying the args at all when a custom XCom backend is involved seems a bit unfriendly to the user.

@potiuk
Copy link
Member

potiuk commented Dec 8, 2022

Yeah, not very unfriendly. Using orm_deserialize is best and I think entirely in-line with the intended use of the method.

I'd even say the current behaviour is a bug because clearly the orm_deserialize method was exactly going to handle those kind of cases according to description:

There is also an orm_deserialize_value method that is called whenever the XCom objects are rendered for UI or reporting purposes; if you have large or expensive-to-retrieve values in your XComs, you should override this method to avoid calling that code (and instead return a lighter, incomplete representation) so the UI remains responsive.

@dilex42
Copy link

dilex42 commented Apr 25, 2023

Hi. I have the same problem and it's kinda frustrating as I think it completely ruins Taskflow idea of presenting DAGs as composition of tasks.
As per your Medium article and issue #8059, Custom XCom was designed to allow users to pass large objects between tasks for cleaner code. And it works great when a task returns something. Problem arises later when you try to pass that returned value to another task. Then all of a sudden we face another attempt to write this large object to database for some reason. So in reality whole "pass objects from task to task" concept isn't working and that's why I completely agree that this issue should be classified as a bug.

Next, I am wondering what possible solutions could there be:

  1. Is it necessary to deserialize XComs before task serialization in db? While I am not an avid user of rendered template tab, I understand why some would like to see templates_dict rendered. But what's the usage of rendered args and especially rendered xcoms? You can always check them in xcoms section or in you custom storage. And, I believe, that tasks that yet to be run have something like XComArgs(DecoratedPython)... in their rendered args. So can't it stay that way in others states?
  2. Perhaps you will be open to let users customize what is serializing in each task? Currently it's ['templates_dict', 'op_args', 'op_kwargs'] but maybe we can pass some parameter to change that behaviour and exclude args from certain tasks
  3. That being said, I agree that using orm_deserialize seems to be good idea as it gives necessary customisation opportunities and kinda why it was invented, if I understand correctly. Am I wrong in hoping that this change is only 1 line of code and should be pretty straightforward or there are a lot of places where this should be changed? If you can point me to specific code in question, I don't mind to try to make a PR.

Also for people with same problem, I'm currently using a workaround of explicitly pulling desired XCom inside a task. It's not ideal but it's clean enough and works.

Thanks for your time, and sorry if it sounds harsh, I'm just frustrated a little bit😞

@potiuk
Copy link
Member

potiuk commented Apr 26, 2023

Thanks for your time, and sorry if it sounds harsh, I'm just frustrated a little bitdisappointed

Why not making a PR an tring to fix it? This is why usually do. Usually when you create a PR and try to fix it you find a good way of doing it and by doing it and showing what you propose, it will be clear what you are proposing. discussing over proposed improvement is always a good idea.

Airflow is created by almost 2.500 people - mostly those who felt frustrated with something and then implemented a fix or feature. This is how it works here.

@eladkal eladkal removed this from the Airflow 2.6.1 milestone Apr 28, 2023
@eladkal eladkal removed this from the Airflow 2.6.4 milestone Aug 1, 2023
@Joffreybvn
Copy link
Contributor

Joffreybvn commented Aug 30, 2023

Hello,

I am having similar issue. I'm whilling to make a PR. Same story as @PatrickfBraz: giving a try to "full taskflow" and Airflow native features, instead of doing everything with KubernetesPodOperator / PythonOperator. Using Airflow 2.7.0.

I have a custom XCom backend. My dag consist of: HttpOperator -> SQLExecuteQueryOperator to pull data from an API, passing it via XCom, and saving it into a database. I use a SQL template which pull XCom and render a big query:

{%- set data = ti.xcom_pull(task_ids='pull_data_from_api') -%}
{%- for entry in data -%}
INSERT INTO "SCHEMA_NAME"."TABLE_NAME" VALUES ('{{entry|tojson}}');
{% endfor %}

Resulting in: (This is an example with mockup data)
Screenshot 2023-08-29 164037
The database I'm pushing to has limitation: I have to push data doing multiple queries. As a workaround, I could do an 'executemany' with params in a PythonOperator. But as soon as this chunk of code would turn into a custom reusable Operator, I will have a templated parameters which will anyway save all the data into the database.

I also see the AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK parameter. But rendered templates are great (especially for beginners), I don't want to disable them, I want to keep them visible in the UI.


So, if I understand well this discussion, a PR to begin with could render the template two times:

  • One for the actual execution of the task, with real data - not stored in the db.
  • One to be stored in the db/shown in the UI, based on orm_deserialize method
    + A mechanism to detect if it actually needs to be rendered two times (checking custom XCom backend)

?

@uranusjr
Copy link
Member

  • One for the actual execution of the task, with real data - not stored in the db
  • One to be stored in the db/shown in the UI, based on orm_deserialize method + A mechanism to detect if it actually needs to be rendered two times (checking custom XCom backend)

Instead of checking a custom backend, we can just check whether the custom backend actually re-implements orm_deserialize_value or not, and only render a second time if that’s the case.

@ephraimbuddy
Copy link
Contributor

Hi @uranusjr @potiuk , I'm looking at this issue but it doesn't seem like using orm_deserialize_value would solve this or am I missing something? The code that causes the writing of this value is here:

self.rendered_fields = {
field: serialize_template_field(getattr(self.task, field)) for field in self.task.template_fields
}
and I'm thinking it should be solved there. thoughts?

@ephraimbuddy ephraimbuddy self-assigned this Mar 4, 2024
@uranusjr
Copy link
Member

uranusjr commented Mar 5, 2024

The context is that when we render the templates (which are saved to RTIF later), the rendering process also implicitly resolves XCom, causing big values to be loaded (and thus saved to RTIF). We do want the values to be loaded for execution, so one possible solution would be to render the templates twice if a custom XCom backend is detected (by checking if orm_deserialize_value is overriden), once normally for execution, once without loading the large values for storage. I think handling this in serialize_template_field might be more difficult; when we reach there, all templates have been rendered and we may miss a lot of context on whether we want a value to be abbrevated or not.

But thinking of this again, maybe that is fine…? Custom XCom is just one special case (where the problem is more significant). It could be argued we should reduce all large values anyway regardless of where they are from, and handling that in serialize_template_field would be reasonable.

ephraimbuddy added a commit to astronomer/airflow that referenced this issue Mar 25, 2024
There's no control over the size of objects stored in the rendered taskinstance
field. This PR adds control and enable users to be able to customize the size of
data that can be stored in this field

closes: apache#28199
ephraimbuddy added a commit that referenced this issue Mar 25, 2024
* Prevent large objects from being stored in the RTIF

There's no control over the size of objects stored in the rendered taskinstance
field. This PR adds control and enable users to be able to customize the size of
data that can be stored in this field

closes: #28199

* fixup! Prevent large objects from being stored in the RTIF

* Use len and check the size of the serialized

* Add test to already existing test

* Remove xcom db clearing

* Update airflow/config_templates/config.yml

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Apply review suggestions

* Add test for redacting values and add significant item

* Prefix with Truncated line

* Ensure secrets are masked

* fixup! Ensure secrets are masked

* Check the template field length in separate branches for jsonable and nonjsonable

* add tests for rendered dataframe

* Apply suggestions from code review

* update code and tests

* fixup! update code and tests

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants