Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to specify Python version for AwsGlueJobOperator #20832

Closed
1 of 2 tasks
jfamestad opened this issue Jan 12, 2022 · 4 comments · Fixed by #24215
Closed
1 of 2 tasks

Unable to specify Python version for AwsGlueJobOperator #20832

jfamestad opened this issue Jan 12, 2022 · 4 comments · Fixed by #24215
Assignees
Labels
area:providers kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues

Comments

@jfamestad
Copy link

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

No response

Apache Airflow version

2.0.2

Operating System

Amazon Linux

Deployment

MWAA

Deployment details

No response

What happened

When a new Glue job is created using the AwsGlueJobOperator, the job is defaulting to Python2. Setting the version in create_job_kwargs fails with key error.

What you expected to happen

Expected the Glue job to be created with a Python3 runtime. create_job_kwargs are passed to the boto3 glue client create_job method which includes a "Command" parameter that is a dictionary containing the Python version.

How to reproduce

Create a dag with an AwsGlueJobOperator and pass a "Command" parameter in the create_job_kwargs argument.

    create_glue_job_args = {
        "Command": {
            "Name": "abalone-preprocess",
            "ScriptLocation": f"s3://{output_bucket}/code/preprocess.py",
            "PythonVersion": "3"
        }
    }
    glue_etl = AwsGlueJobOperator(  
        task_id="glue_etl",  
        s3_bucket=output_bucket,
        script_args={
                '--S3_INPUT_BUCKET': data_bucket,
                '--S3_INPUT_KEY_PREFIX': 'input/raw',
                '--S3_UPLOADS_KEY_PREFIX': 'input/uploads',
                '--S3_OUTPUT_BUCKET': output_bucket,
                '--S3_OUTPUT_KEY_PREFIX': str(determine_dataset_id.output) +'/input/data' 
            },
        iam_role_name="MLOps",  
        retry_limit=2,
        concurrent_run_limit=3,
        create_job_kwargs=create_glue_job_args,
        dag=dag) 
[2022-01-04 16:43:42,053] {{logging_mixin.py:104}} INFO - [2022-01-04 16:43:42,053] {{glue.py:190}} ERROR - Failed to create aws glue job, error: 'Command'
[2022-01-04 16:43:42,081] {{logging_mixin.py:104}} INFO - [2022-01-04 16:43:42,081] {{glue.py:112}} ERROR - Failed to run aws glue job, error: 'Command'
[2022-01-04 16:43:42,101] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 166, in get_or_create_glue_job
    get_job_response = glue_client.get_job(JobName=self.job_name)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 676, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.EntityNotFoundException: An error occurred (EntityNotFoundException) when calling the GetJob operation: Job with name: abalone-preprocess not found.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/glue.py", line 121, in execute
    glue_job_run = glue_job.initialize_job(self.script_args)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 108, in initialize_job
    job_name = self.get_or_create_glue_job()
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 186, in get_or_create_glue_job
    **self.create_job_kwargs,
KeyError: 'Command'

Anything else

When a new job is being created.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jfamestad jfamestad added area:providers kind:bug This is a clearly a bug labels Jan 12, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Jan 12, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@mik-laj mik-laj added the provider:amazon-aws AWS/Amazon - related issues label Jan 12, 2022
@SamWheating
Copy link
Contributor

SamWheating commented Jan 12, 2022

I think that this is because the GlueHook is pretty opinionated and hardcodes the value of Command when running the glue_client.create_job command:

def get_or_create_glue_job(self) -> str:
"""
Creates(or just returns) and returns the Job name
:return:Name of the Job
"""
glue_client = self.get_conn()
try:
get_job_response = glue_client.get_job(JobName=self.job_name)
self.log.info("Job Already exist. Returning Name of the job")
return get_job_response['Job']['Name']
except glue_client.exceptions.EntityNotFoundException:
self.log.info("Job doesn't exist. Now creating and running AWS Glue Job")
if self.s3_bucket is None:
raise AirflowException('Could not initialize glue job, error: Specify Parameter `s3_bucket`')
s3_log_path = f's3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}'
execution_role = self.get_iam_execution_role()
try:
if "WorkerType" in self.create_job_kwargs and "NumberOfWorkers" in self.create_job_kwargs:
create_job_response = glue_client.create_job(
Name=self.job_name,
Description=self.desc,
LogUri=s3_log_path,
Role=execution_role['Role']['Arn'],
ExecutionProperty={"MaxConcurrentRuns": self.concurrent_run_limit},
Command={"Name": "glueetl", "ScriptLocation": self.script_location},
MaxRetries=self.retry_limit,
**self.create_job_kwargs,
)
else:
create_job_response = glue_client.create_job(
Name=self.job_name,
Description=self.desc,
LogUri=s3_log_path,
Role=execution_role['Role']['Arn'],
ExecutionProperty={"MaxConcurrentRuns": self.concurrent_run_limit},
Command={"Name": "glueetl", "ScriptLocation": self.script_location},
MaxRetries=self.retry_limit,
MaxCapacity=self.num_of_dpus,
**self.create_job_kwargs,
)
return create_job_response['Name']
except Exception as general_error:
self.log.error("Failed to create aws glue job, error: %s", general_error)
raise

So when you provide Command as a create_job_kwargs, it ends up being supplied twice to that function (Although I suspect that this would be a typeError, not a keyError 🤔)

Anyways, thoughts on just making the Command.Name and Command.PythonVersion argument configurable in the GlueJobOperator?

If y'all think that this is a satisfactory fix, feel free to assign this issue to me and I can put up a quick PR.

@SamWheating
Copy link
Contributor

SamWheating commented Jan 12, 2022

Also for what its worth, I think that the Command block in your DAG is invalid, as the Command.Name you're using (abalone-preprocess) must be one of glueetl, pythonshell or gluestreaming.

https://docs.aws.amazon.com/glue/latest/webapi/API_JobCommand.html

@eladkal
Copy link
Contributor

eladkal commented Jan 15, 2022

@SamWheating We can make them configurable

I'm just not I'm not 100% why must we create the command. Can't we just leave the create_job_kwargs as the user passes?

Command={"Name": "glueetl", "ScriptLocation": self.script_location},

Command={"Name": "glueetl", "ScriptLocation": self.script_location},

There are other parameters that don't get this special treatment:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants