Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMRServerlessStartJobOperator Expand/Expand Kwargs not Serializing properly #38005

Closed
2 tasks done
jliu0812 opened this issue Mar 9, 2024 · 2 comments · Fixed by #38022
Closed
2 tasks done

EMRServerlessStartJobOperator Expand/Expand Kwargs not Serializing properly #38005

jliu0812 opened this issue Mar 9, 2024 · 2 comments · Fixed by #38022
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon AWS/Amazon - related issues

Comments

@jliu0812
Copy link
Contributor

jliu0812 commented Mar 9, 2024

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.19.0

Apache Airflow version

2.8.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

Used breeze tool to deploy.

What happened

When using the EmrServerlessStartJobOperator, using the airflow expand functionality is not possible. The DAG will fail to serialize and it shows a DAG import error in the webserver. This is because EmrServerlessStartJobOperator.operator_extra_links is called and EmrServerlessStartJobOperator is of type MappedOperator, but MappedOperator does not have the EmrServerlessStartJobOperator.is_monitoring_in_job_override attribute.

What you think should happen instead

DAG should import successfully without any errors.

How to reproduce

The following single usage of EmrServerlessStartJobOperator works:

from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessStartJobOperator,
)

DAG_ID = "example_emr_serverless"
emr_serverless_app_id = "01234abcd"
role_arn = "arn:test"


with DAG(
    dag_id=DAG_ID,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
):
    start_job = EmrServerlessStartJobOperator(
        task_id="start_emr_serverless_job",
        application_id=emr_serverless_app_id,
        execution_role_arn=role_arn,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "test.jar",
                "entryPointArguments": ["--arg", "1"],
                "sparkSubmitParameters": "--conf sample",
            }
        },
        configuration_overrides={
            "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://test/logs"}}
        },
    )

Whereas the following usage of expanded EmrServerlessStartJobOperator will fail to serialize:

from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessStartJobOperator,
)

DAG_ID = "example_emr_serverless"
emr_serverless_app_id = "01234abcd"
role_arn = "arn:test"


with DAG(
    dag_id=DAG_ID,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
):
    start_job = EmrServerlessStartJobOperator.partial(
        task_id="start_emr_serverless_job",
        application_id=emr_serverless_app_id,
        execution_role_arn=role_arn,
        configuration_overrides={
            "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://test/logs"}}
        },
    ).expand(
        job_driver=[{
            "sparkSubmit": {
                "entryPoint": "test.jar",
                "entryPointArguments": ["--arg", "1"],
                "sparkSubmitParameters": "--conf sample",
            }
        },{
            "sparkSubmit": {
                "entryPoint": "test.jar",
                "entryPointArguments": ["--arg", "2"],
                "sparkSubmitParameters": "--conf sample",
            }
        }]
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jliu0812 jliu0812 added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Mar 9, 2024
Copy link

boring-cyborg bot commented Mar 9, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@jliu0812
Copy link
Contributor Author

jliu0812 commented Mar 9, 2024

I will be working on this.

Full stack trace for reference:

[2024-03-09T00:45:58.280+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.280+0000] {dagbag.py:540} INFO - Filling up the DagBag from /files/dags/example_mapped_emr_serverless.py
[2024-03-09T00:45:58.298+0000] {processor.py:840} INFO - DAG(s) 'example_emr_serverless' retrieved from /files/dags/example_mapped_emr_serverless.py
[2024-03-09T00:45:58.315+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.310+0000] {dagbag.py:649} ERROR - Failed to write serialized DAG: /files/dags/example_mapped_emr_serverless.py
Traceback (most recent call last):
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1354, in serialize_dag
serialized_dag["tasks"] = [cls.serialize(task) for _, task in dag.task_dict.items()]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1354, in
serialized_dag["tasks"] = [cls.serialize(task) for _, task in dag.task_dict.items()]
^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 462, in serialize
return SerializedBaseOperator.serialize_mapped_operator(var)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 857, in serialize_mapped_operator
serialized_op = cls._serialize_node(op, include_deps=op.deps != MappedOperator.deps_for(BaseOperator))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 899, in _serialize_node
op.operator_extra_links.get(op)
File "/opt/airflow/airflow/providers/amazon/aws/operators/emr.py", line 1273, in operator_extra_links
if self.is_monitoring_in_job_override("s3MonitoringConfiguration", configuration_overrides):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'MappedOperator' object has no attribute 'is_monitoring_in_job_override'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/opt/airflow/airflow/models/dagbag.py", line 637, in serialize_dag_capturing_errors
dag_was_updated = SerializedDagModel.write_dag(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/serialized_dag.py", line 166, in write_dag
new_serialized_dag = cls(dag, processor_subdir)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "", line 4, in init
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py", line 481, in initialize_instance
with util.safe_reraise():
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in exit
compat.raise
(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise

raise exception
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/state.py", line 479, in _initialize_instance
return manager.original_init(*mixed[1:], **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/serialized_dag.py", line 113, in init
dag_data = SerializedDAG.to_dict(dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1463, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1378, in serialize_dag
raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG 'example_emr_serverless': 'MappedOperator' object has no attribute 'is_monitoring_in_job_override'
[2024-03-09T00:45:58.316+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.316+0000] {dag.py:3068} INFO - Sync 1 DAGs
[2024-03-09T00:45:58.326+0000] {logging_mixin.py:188} INFO - [2024-03-09T00:45:58.326+0000] {dag.py:3912} INFO - Setting next_dagrun for example_emr_serverless to 2021-01-01 00:00:00+00:00, run_after=2021-01-01 00:00:00+00:00
[2024-03-09T00:45:58.345+0000] {processor.py:183} INFO - Processing /files/dags/example_mapped_emr_serverless.py took 0.073 seconds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants