Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to deserialize JSON from last log line in BashOperator and DockerOperator before sending to XCom #27079

Closed
2 tasks done
qcha41 opened this issue Oct 16, 2022 · 5 comments · Fixed by #28930
Closed
2 tasks done
Labels

Comments

@qcha41
Copy link

qcha41 commented Oct 16, 2022

Description

In order to create an XCom value with a BashOperator or a DockerOperator, we can use the option do_xcom_push that pushes to XCom the last line of the command logs.

It would be interesting to provide an option xcom_json to deserialize this last log line in case it's a JSON string, before sending it as XCom. This would allow to access its attributes later in other tasks with the xcom_pull() method.

Use case/motivation

See my StackOverflow post : https://stackoverflow.com/questions/74083466/how-to-deserialize-xcom-strings-in-airflow

Consider a DAG containing two tasks: DAG: Task A >> Task B (BashOperators or DockerOperators). They need to communicate through XComs.

  • Task A outputs the informations through a one-line json in stdout, which can then be retrieve in the logs of Task A, and so in its return_value XCom key if xcom_push=True. For instance : {"key1":1,"key2":3}

  • Task B only needs the key2 information from Task A, so we need to deserialize the return_value XCom of Task A to extract only this value and pass it directly to Task B, using the jinja template {{xcom_pull('task_a')['key2']}}. Using it as this results in jinja2.exceptions.UndefinedError: 'str object' has no attribute 'key2' because return_value is just a string.

For example we can deserialize Airflow Variables in jinja templates (ex: {{ var.json.my_var.path }}). Globally I would like to do the same thing with XComs.

Current workaround:

We can create a custom Operator (inherited from BashOperator or DockerOperator) and augment the execute method:

  1. execute the original execute method
  2. intercepts the last log line of the task
  3. tries to json.loads() it in a Python dictionnary
  4. finally return the output (which is now a dictionnary, not a string)

The previous jinja template {{ xcom_pull('task_a')['key2'] }} is now working in task B, since the XCom value is now a Python dictionnary.

class BashOperatorExtended(BashOperator):
    def execute(self, context):
        output = BashOperator.execute(self, context)
        try: 
            output = json.loads(output)
        except:
            pass
        return output

class DockerOperatorExtended(DockerOperator):
    def execute(self, context):
        output = DockerOperator.execute(self, context)
        try: 
            output = json.loads(output)
        except:
            pass
        return output

But creating a new operator just for that purpose is not really satisfying..

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@qcha41 qcha41 added the kind:feature Feature Requests label Oct 16, 2022
@uranusjr
Copy link
Member

But creating a new operator just for that purpose is not really satisfying..

People should be encouraged more to create new, ad-hoc operators, if you ask me. Classes are first-class in Python, and an Airflow DAG is Python, so it’s most productive to use Python. If we try to put every possible customisation in operators, the end result would not be different from, say, using a YAML file, and defeats to purpose to use Python in the first place.

@qcha41
Copy link
Author

qcha41 commented Oct 17, 2022

I agree but here, we talk about a feature that already exists implicitely in other operators. For instance, with a PythonOperator, you can push a Python dictionnary, and directly use it as a dictionnary with the xcom_pull() method.

@potiuk
Copy link
Member

potiuk commented Oct 24, 2022

The previous jinja template {{ xcom_pull('task_a')['key2'] }} is now working in task B, since the XCom value is now a Python dictionnary.

Actually I think that could be made into a common "AbstractOperator" feature when I think of it. We could add "deserialize_output" parameter so that any operator can use it. I think we should even deserialize it using yaml, because then we will automatically handle both Yaml, and JSON (Yamlk is actually a 100% compatible superset of JSON - every proper JSON content is also a valid YAML).

WDYT @uranusjr ? I think having it as common "operator" feature (disabled by default) is quite a powerful feature that can maje a number of existing operators much easier to work witth.

@potiuk
Copy link
Member

potiuk commented Oct 31, 2022

@uranusjr ? Any thoughts ?

@uranusjr
Copy link
Member

If the goal is to make Jinja2 templating simpler (there’s no issue if it’s taskflow), the simplest way may be to add a built-in macro for this?

{{ json_loads(xcom_pull('task_a'))['key2'] }}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants