Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions providers/amazon/docs/executors/batch-executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Options <https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.
:end-before: .. END CONFIG_OPTIONS_PRECEDENCE

.. note::
``exec_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the Batch Executor it represents a ``submit_job_kwargs`` configuration which is then updated over-top of the ``submit_job_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``submit_job_kwargs.update(exec_config)``
``executor_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the Batch Executor it represents a ``submit_job_kwargs`` configuration which is then updated over-top of the ``submit_job_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``submit_job_kwargs.update(executor_config)``

Required config options:
~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -98,7 +98,7 @@ hints and examples, see the ``config_templates`` folder in the Amazon
provider package.

.. note::
``exec_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the Batch Executor it represents a ``submit_job_kwargs`` configuration which is then updated over-top of the ``submit_job_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``submit_job_kwargs.update(exec_config)``
``executor_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the Batch Executor it represents a ``submit_job_kwargs`` configuration which is then updated over-top of the ``submit_job_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``submit_job_kwargs.update(executor_config)``

.. _dockerfile_for_batch_executor:

Expand Down
2 changes: 1 addition & 1 deletion providers/amazon/docs/executors/ecs-executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ In the case of conflicts, the order of precedence from lowest to highest is:
provided.

.. note::
``exec_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the ECS Executor it represents a ``run_task_kwargs`` configuration which is then updated over-top of the ``run_task_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``run_task_kwargs.update(exec_config)``
``executor_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the ECS Executor it represents a ``run_task_kwargs`` configuration which is then updated over-top of the ``run_task_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``run_task_kwargs.update(executor_config)``

Required config options:
~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
3 changes: 3 additions & 0 deletions providers/amazon/docs/executors/lambda-executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ In the case of conflicts, the order of precedence from lowest to highest is:
environment variables. These are checked with Airflow's config
precedence.

.. note::
``executor_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the Lambda Executor it is passed to each Lambda invocation as part of the payload. This allows you to pass additional context to the Lambda function for any particular task execution. The Lambda function can then access this configuration via the ``executor_config`` key in the payload within the Lambda handler code.

Required config options:
~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
# Input and output keys
TASK_KEY_KEY = "task_key"
COMMAND_KEY = "command"
EXECUTOR_CONFIG_KEY = "executor_config"
RETURN_CODE_KEY = "return_code"


Expand All @@ -47,8 +48,9 @@ def lambda_handler(event, context):

command = event.get(COMMAND_KEY)
task_key = event.get(TASK_KEY_KEY)
executor_config = event.get(EXECUTOR_CONFIG_KEY, {}) # noqa: F841

# Any pre-processing or validation of the command or use of the context can be done here or above.
# Any pre-processing or validation of the command or use of the executor_config can be done here or above.

# Sync dags from s3 to the local dags directory
if S3_URI:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def attempt_task_runs(self):
payload = {
"task_key": ser_task_key,
"command": cmd,
"executor_config": task_to_run.executor_config,
}
if timezone.utcnow() < task_to_run.next_attempt_time:
self.pending_tasks.append(task_to_run)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def test_execute(self, change_state_mock, mock_airflow_key, mock_executor, mock_

mock_executor.attempt_task_runs()
mock_executor.lambda_client.invoke.assert_called_once()
payload = json.loads(mock_executor.lambda_client.invoke.call_args.kwargs["Payload"])
assert payload["executor_config"] == {}

# Task is stored in active worker.
assert len(mock_executor.running_tasks) == 1
Expand All @@ -137,10 +139,12 @@ def test_task_sdk(self, change_state_mock, mock_airflow_key, mock_executor, mock

airflow_key = mock_airflow_key()
ser_airflow_key = json.dumps(airflow_key._asdict())
executor_config = {"config_key": "config_value"}

workload = mock.Mock(spec=ExecuteTask)
workload.ti = mock.Mock(spec=TaskInstance)
workload.ti.key = airflow_key
workload.ti.executor_config = executor_config
ser_workload = json.dumps({"test_key": "test_value"})
workload.model_dump_json.return_value = ser_workload

Expand All @@ -164,6 +168,8 @@ def test_task_sdk(self, change_state_mock, mock_airflow_key, mock_executor, mock

mock_executor.attempt_task_runs()
mock_executor.lambda_client.invoke.assert_called_once()
payload = json.loads(mock_executor.lambda_client.invoke.call_args.kwargs["Payload"])
assert payload["executor_config"] == executor_config
assert len(mock_executor.pending_tasks) == 0

# Task is stored in active worker.
Expand Down