Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EmrServerlessStartJobOperator causes dag load failure when using XComArg #40103

Closed
1 of 2 tasks
fjmacagno opened this issue Jun 6, 2024 · 5 comments · Fixed by #40627
Closed
1 of 2 tasks

EmrServerlessStartJobOperator causes dag load failure when using XComArg #40103

fjmacagno opened this issue Jun 6, 2024 · 5 comments · Fixed by #40627
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues

Comments

@fjmacagno
Copy link
Contributor

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.23.0

Apache Airflow version

2.9.1

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Astronomer

Deployment details

No response

What happened

We get the dag load error

Broken DAG: [/usr/local/airflow/dags/governance/scrub/parquet/parquet_scrub.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/amazon/aws/operators/emr.py", line 1321, in operator_extra_links
    if operator_class.is_monitoring_in_job_override(
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/amazon/aws/operators/emr.py", line 1535, in is_monitoring_in_job_override
    monitoring_config = (job_override or {}).get("monitoringConfiguration")
AttributeError: 'PlainXComArg' object has no attribute 'get'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1540, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1453, in serialize_dag
    raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG 'parquet-scrub': 'PlainXComArg' object has no attribute 'get'

What you think should happen instead

It shouldn't error, and we should see the operator links in the UI.

How to reproduce

Pass an XCom result into the configuration_overrides parameter of the EmrServerlessStartJobOperator.

Anything else

Happens every time, issue just appears to be that the link function is called before templating is rendered.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@fjmacagno fjmacagno added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 6, 2024
@vatsrahul1001 vatsrahul1001 added the provider:amazon-aws AWS/Amazon - related issues label Jun 6, 2024
@vatsrahul1001
Copy link
Collaborator

@fjmacagno could you share an example DAG which you used here ?

@fjmacagno
Copy link
Contributor Author

from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessDeleteApplicationOperator, EmrServerlessStartJobOperator,
)

from foursquare_plugin.policies import FsqDagTag
from fsq.airflow.helpers.emr.serverless.application_config_builder import ServerlessApplicationConfigBuilder
from fsq.airflow.helpers.emr.serverless.job_driver_builder import ServerlessJobDriverBuilder
from fsq.airflow.team.aws_account_configs import NotebookInternalProtocol
from fsq.airflow.team.team import DEVELOPER_EXPERIENCE_TEAM

with DAG(
    dag_id="spark-pipeline-example",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    default_args={
        "owner": DEVELOPER_EXPERIENCE_TEAM.name(),
    },
    tags=[FsqDagTag.TEST_AT_WILL.name],
) as dag:
    create_app = EmrServerlessCreateApplicationOperator(
        task_id="create-spark-app",
        job_type="SPARK",
        release_label="emr-6.10.0",
        config=ServerlessApplicationConfigBuilder(NotebookInternalProtocol())
        .with_name("spark-pipeline-example")
        .build(),
        aws_conn_id=NotebookInternalProtocol().aws_conn_id(),
    )

    application_id: str = str(create_app.output)

    jar_submit_job = EmrServerlessStartJobOperator(
        task_id="join-chains-categories",
        config={"name": "join-chains-categories-test"},
        application_id=application_id,
        execution_role_arn=NotebookInternalProtocol().emr_serverless_role_arn(),
        job_driver=(
            ServerlessJobDriverBuilder()
            .with_entry_point("s3://4sq-dev/artifacts/fsq-graph-spark-examples_deploy.jar")
            .with_class("com.foursquare.spark.examples.S3ReadWriteExample")
            .build()
        ),
        wait_for_completion=True,
        deferrable=True,
        aws_conn_id=NotebookInternalProtocol().aws_conn_id(),
        configuration_overrides=create_app.output,   <----- important line
    )

    delete_app = EmrServerlessDeleteApplicationOperator(
        task_id="delete_app",
        application_id=application_id,
        trigger_rule="all_done",
        aws_conn_id=NotebookInternalProtocol().aws_conn_id(),
    )

    (create_app >> jar_submit_job >> delete_app)

Works fine until i add the line labeled "important line".

@vatsrahul1001
Copy link
Collaborator

vatsrahul1001 commented Jun 12, 2024

@fjmacagno EmrServerlessCreateApplicationOperator returns application id which you are using in your DAG here application_id: str = str(create_app.output). configuration_overrides is of type dict why are we passing xcom result of EmrServerlessCreateApplicationOperator which would not be dict in this case.

@vatsrahul1001
Copy link
Collaborator

Saying that I updated my DAG to use a xcom value which should return a dict, however, still I am getting same parsing error

  SPARK_CONFIGURATION_OVERRIDES = {
        "monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{bucket_name}/logs"}}
    }

    def test_xcom():
        return SPARK_CONFIGURATION_OVERRIDES


    @task()
    def test_log():
        print(f"data is {type(emr_serverless_app_id)}")
        print(f"data is {str(emr_serverless_app_id)}")
        return SPARK_CONFIGURATION_OVERRIDES

    t1 = PythonOperator(
        task_id="t1",
        python_callable=test_xcom,
    )

    # [START howto_sensor_emr_serverless_application]
    wait_for_app_creation = EmrServerlessApplicationSensor(
        task_id="wait_for_app_creation",
        application_id=emr_serverless_app_id,
    )
    # [END howto_sensor_emr_serverless_application]
    wait_for_app_creation.poke_interval = 1

    # [START howto_operator_emr_serverless_start_job]
    start_job = EmrServerlessStartJobOperator(
        task_id="start_emr_serverless_job",
        application_id=emr_serverless_app_id,
        execution_role_arn=role_arn,
        job_driver=SPARK_JOB_DRIVER,
        configuration_overrides=t1.output
    )

Verified configuration_overrides is a templated_field

@o-nikolas
Copy link
Contributor

@fjmacagno @vatsrahul1001 Can you have a look at #40627

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
3 participants