Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f22dff8
Add fail_on_nonzero_exit parameter to SsmRunCommandOperator
Nov 3, 2025
583769e
Add fail_on_nonzero_exit parameter to SSM sensor and trigger
Nov 3, 2025
6839c8b
Document SsmGetCommandInvocationOperator for exit code routing
Nov 3, 2025
145aee0
Add documentation for SSM exit code handling
Nov 3, 2025
84ba52c
Add unit tests for SSM exit code handling enhancements
Nov 3, 2025
696008a
ruff fix
Nov 3, 2025
c9e1a8f
Add documentation to provider.yaml ; Fix spelling mistakes.
Nov 3, 2025
cc9d014
Consolidate SSM exit code documentation into main SSM doc
Nov 4, 2025
2dc8108
Consolidate SSM exit code tests into main system test
Nov 4, 2025
2e6b795
Remove ssm_exit_codes.rst reference from provider.yaml
Nov 4, 2025
ee7a4c8
reducing volume of docs and structure in docs
Nov 5, 2025
f492432
fix empty line with trailing whitespace.
Nov 5, 2025
6f082bb
Merge branch 'main' into ssm-exit-code-handling
ksharlandjiev Nov 6, 2025
f8450e1
Merge branch 'apache:main' into ssm-exit-code-handling
ksharlandjiev Dec 8, 2025
50f8420
refactor(aws/ssm): Extract AWS-level failure detection into reusable …
Dec 8, 2025
2ef5597
Merge branch 'main' into ssm-exit-code-handling
ksharlandjiev Dec 8, 2025
b018788
fix MyPy checks
Dec 8, 2025
43dbb7a
Merge remote-tracking branch 'refs/remotes/origin/ssm-exit-code-handl…
Dec 8, 2025
a6db3ac
Added "Cancelling" to the spelling wordlist. "Cancelling" is official…
Dec 8, 2025
4483861
Merge branch 'main' into ssm-exit-code-handling
ksharlandjiev Jan 7, 2026
939a289
Update providers/amazon/docs/operators/ssm.rst
ksharlandjiev Jan 7, 2026
38aa9cb
ruff fix
Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ callsite
camelCase
Cancelled
cancelled
Cancelling
carbonite
cas
Cassanda
Expand Down
32 changes: 31 additions & 1 deletion providers/amazon/docs/operators/ssm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,42 @@ Waiter. Additionally, you can use the following components to track the status o
:class:`~airflow.providers.amazon.aws.sensors.ssm.SsmRunCommandCompletedSensor` Sensor,
or the :class:`~airflow.providers.amazon.aws.triggers.ssm.SsmRunCommandTrigger` Trigger.


.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_ssm.py
:language: python
:dedent: 4
:start-after: [START howto_operator_run_command]
:end-before: [END howto_operator_run_command]

Exit code handling
^^^^^^^^^^^^^^^^^^

By default, both :class:`~airflow.providers.amazon.aws.operators.ssm.SsmRunCommandOperator` and
:class:`~airflow.providers.amazon.aws.sensors.ssm.SsmRunCommandCompletedSensor` will fail the task
if the command returns a non-zero exit code. You can change this behavior using the ``fail_on_nonzero_exit``
parameter:

.. code-block:: python

# Default behavior - task fails on non-zero exit codes
run_command = SsmRunCommandOperator(
task_id="run_command",
document_name="AWS-RunShellScript",
run_command_kwargs={...},
)

# Allow non-zero exit codes - task succeeds regardless of exit code
run_command = SsmRunCommandOperator(
task_id="run_command",
document_name="AWS-RunShellScript",
run_command_kwargs={...},
fail_on_nonzero_exit=False,
)

When ``fail_on_nonzero_exit=False``, you can retrieve the exit code using
:class:`~airflow.providers.amazon.aws.operators.ssm.SsmGetCommandInvocationOperator` and use it
for workflow routing decisions. Note that AWS-level failures (TimedOut, Cancelled) will still raise
exceptions regardless of this setting.

.. _howto/operator:SsmGetCommandInvocationOperator:

Retrieve output from an SSM command invocation
Expand Down
22 changes: 22 additions & 0 deletions providers/amazon/src/airflow/providers/amazon/aws/hooks/ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,25 @@ def list_command_invocations(self, command_id: str) -> dict:
:return: Response from SSM list_command_invocations API.
"""
return self.conn.list_command_invocations(CommandId=command_id)

@staticmethod
def is_aws_level_failure(status: str) -> bool:
"""
Check if a command status represents an AWS-level failure.

AWS-level failures are service-level issues that should always raise exceptions,
as opposed to command-level failures (non-zero exit codes) which may be tolerated
depending on the fail_on_nonzero_exit parameter.

According to AWS SSM documentation, the possible statuses are:
Pending, InProgress, Delayed, Success, Cancelled, TimedOut, Failed, Cancelling

AWS-level failures are:
- Cancelled: Command was cancelled before completion
- TimedOut: Command exceeded the timeout period
- Cancelling: Command is in the process of being cancelled

:param status: The command invocation status from SSM.
:return: True if the status represents an AWS-level failure, False otherwise.
"""
return status in ("Cancelled", "TimedOut", "Cancelling")
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

from botocore.exceptions import WaiterError

from airflow.providers.amazon.aws.hooks.ssm import SsmHook
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.triggers.ssm import SsmRunCommandTrigger
Expand Down Expand Up @@ -50,6 +52,12 @@ class SsmRunCommandOperator(AwsBaseOperator[SsmHook]):
(default: 120)
:param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 75)
:param fail_on_nonzero_exit: If True (default), the operator will fail when
the command returns a non-zero exit code. If False, the operator will
complete successfully regardless of the command exit code, allowing
downstream tasks to handle exit codes for workflow routing. Note that
AWS-level failures (Cancelled, TimedOut) will still raise exceptions
even when this is False. (default: True)
:param deferrable: If True, the operator will wait asynchronously for the
cluster to stop. This implies waiting for completion. This mode
requires aiobotocore module to be installed. (default: False)
Expand Down Expand Up @@ -81,13 +89,15 @@ def __init__(
wait_for_completion: bool = True,
waiter_delay: int = 120,
waiter_max_attempts: int = 75,
fail_on_nonzero_exit: bool = True,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
):
super().__init__(**kwargs)
self.wait_for_completion = wait_for_completion
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.fail_on_nonzero_exit = fail_on_nonzero_exit
self.deferrable = deferrable

self.document_name = document_name
Expand Down Expand Up @@ -118,6 +128,7 @@ def execute(self, context: Context):
command_id=command_id,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
fail_on_nonzero_exit=self.fail_on_nonzero_exit,
aws_conn_id=self.aws_conn_id,
region_name=self.region_name,
verify=self.verify,
Expand All @@ -132,14 +143,35 @@ def execute(self, context: Context):

instance_ids = response["Command"]["InstanceIds"]
for instance_id in instance_ids:
waiter.wait(
CommandId=command_id,
InstanceId=instance_id,
WaiterConfig={
"Delay": self.waiter_delay,
"MaxAttempts": self.waiter_max_attempts,
},
)
try:
waiter.wait(
CommandId=command_id,
InstanceId=instance_id,
WaiterConfig={
"Delay": self.waiter_delay,
"MaxAttempts": self.waiter_max_attempts,
},
)
except WaiterError:
if not self.fail_on_nonzero_exit:
# Enhanced mode: distinguish between AWS-level and command-level failures
invocation = self.hook.get_command_invocation(command_id, instance_id)
status = invocation.get("Status", "")

# AWS-level failures should always raise
if SsmHook.is_aws_level_failure(status):
raise

# Command-level failure - tolerate it in enhanced mode
self.log.info(
"Command completed with status %s (exit code: %s). "
"Continuing due to fail_on_nonzero_exit=False",
status,
invocation.get("ResponseCode", "unknown"),
)
else:
# Traditional mode: all failures raise
raise

return command_id

Expand All @@ -148,14 +180,6 @@ class SsmGetCommandInvocationOperator(AwsBaseOperator[SsmHook]):
"""
Retrieves the output and execution details of an SSM command invocation.

This operator allows you to fetch the standard output, standard error,
execution status, and other details from SSM commands. It can be used to
retrieve output from commands executed by SsmRunCommandOperator in previous
tasks, or from commands executed outside of Airflow entirely.

The operator returns structured data including stdout, stderr, execution
times, and status information for each instance that executed the command.

.. seealso::
For more information on how to use this operator, take a look at the
guide:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class SsmRunCommandCompletedSensor(AwsBaseSensor[SsmHook]):
:ref:`howto/sensor:SsmRunCommandCompletedSensor`

:param command_id: The ID of the AWS SSM Run Command.
:param fail_on_nonzero_exit: If True (default), the sensor will fail when the command
returns a non-zero exit code. If False, the sensor will complete successfully
for both Success and Failed command statuses, allowing downstream tasks to handle
exit codes. AWS-level failures (Cancelled, TimedOut) will still raise exceptions.
(default: True)
:param deferrable: If True, the sensor will operate in deferrable mode.
This mode requires aiobotocore module to be installed.
(default: False, but can be overridden in config file by setting
Expand Down Expand Up @@ -85,13 +90,15 @@ def __init__(
self,
*,
command_id,
fail_on_nonzero_exit: bool = True,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
poke_interval: int = 120,
max_retries: int = 75,
**kwargs,
):
super().__init__(**kwargs)
self.command_id = command_id
self.fail_on_nonzero_exit = fail_on_nonzero_exit
self.deferrable = deferrable
self.poke_interval = poke_interval
self.max_retries = max_retries
Expand All @@ -112,7 +119,19 @@ def poke(self, context: Context):
state = invocation["Status"]

if state in self.FAILURE_STATES:
raise RuntimeError(self.FAILURE_MESSAGE)
# Check if we should tolerate this failure
if self.fail_on_nonzero_exit:
raise RuntimeError(self.FAILURE_MESSAGE) # Traditional behavior

# Only fail on AWS-level issues, tolerate command failures
if SsmHook.is_aws_level_failure(state):
raise RuntimeError(f"SSM command {self.command_id} {state}")

# Command failed but we're tolerating it
self.log.info(
"Command invocation has status %s. Continuing due to fail_on_nonzero_exit=False",
state,
)

if state in self.INTERMEDIATE_STATES:
return False
Expand All @@ -127,6 +146,7 @@ def execute(self, context: Context):
waiter_delay=int(self.poke_interval),
waiter_max_attempts=self.max_retries,
aws_conn_id=self.aws_conn_id,
fail_on_nonzero_exit=self.fail_on_nonzero_exit,
),
method_name="execute_complete",
)
Expand Down
52 changes: 42 additions & 10 deletions providers/amazon/src/airflow/providers/amazon/aws/triggers/ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class SsmRunCommandTrigger(AwsBaseWaiterTrigger):
:param command_id: The ID of the AWS SSM Run Command.
:param waiter_delay: The amount of time in seconds to wait between attempts. (default: 120)
:param waiter_max_attempts: The maximum number of attempts to be made. (default: 75)
:param fail_on_nonzero_exit: If True (default), the trigger will fail when the command returns
a non-zero exit code. If False, the trigger will complete successfully regardless of the
command exit code. (default: True)
:param aws_conn_id: The Airflow connection used for AWS credentials.
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
:param verify: Whether or not to verify SSL certificates. See:
Expand All @@ -49,13 +52,14 @@ def __init__(
command_id: str,
waiter_delay: int = 120,
waiter_max_attempts: int = 75,
fail_on_nonzero_exit: bool = True,
aws_conn_id: str | None = None,
region_name: str | None = None,
verify: bool | str | None = None,
botocore_config: dict | None = None,
) -> None:
super().__init__(
serialized_fields={"command_id": command_id},
serialized_fields={"command_id": command_id, "fail_on_nonzero_exit": fail_on_nonzero_exit},
waiter_name="command_executed",
waiter_args={"CommandId": command_id},
failure_message="SSM run command failed.",
Expand All @@ -71,6 +75,7 @@ def __init__(
botocore_config=botocore_config,
)
self.command_id = command_id
self.fail_on_nonzero_exit = fail_on_nonzero_exit

def hook(self) -> AwsGenericHook:
return SsmHook(
Expand All @@ -89,14 +94,41 @@ async def run(self) -> AsyncIterator[TriggerEvent]:

for instance_id in instance_ids:
self.waiter_args["InstanceId"] = instance_id
await async_wait(
waiter,
self.waiter_delay,
self.attempts,
self.waiter_args,
self.failure_message,
self.status_message,
self.status_queries,
)
try:
await async_wait(
waiter,
self.waiter_delay,
self.attempts,
self.waiter_args,
self.failure_message,
self.status_message,
self.status_queries,
)
except Exception:
if not self.fail_on_nonzero_exit:
# Enhanced mode: check if it's an AWS-level failure
invocation = await client.get_command_invocation(
CommandId=self.command_id, InstanceId=instance_id
)
status = invocation.get("Status", "")

# AWS-level failures should always raise
if SsmHook.is_aws_level_failure(status):
raise

# Command-level failure - tolerate it in enhanced mode
response_code = invocation.get("ResponseCode", "unknown")
self.log.info(
"Command %s completed with status %s (exit code: %s) for instance %s. "
"Continuing due to fail_on_nonzero_exit=False",
self.command_id,
status,
response_code,
instance_id,
)
continue
else:
# Traditional mode: all failures raise
raise

yield TriggerEvent({"status": "success", self.return_key: self.return_value})
Loading
Loading