Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion providers/amazon/docs/operators/ssm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,43 @@ or the :class:`~airflow.providers.amazon.aws.triggers.ssm.SsmRunCommandTrigger`
:dedent: 4
:start-after: [START howto_operator_run_command]
:end-before: [END howto_operator_run_command]
.. _howto/operator:SsmGetCommandInvocationOperator:

Retrieve output from an SSM command invocation
==============================================

To retrieve the output and execution details from an SSM command that has been executed, you can use
:class:`~airflow.providers.amazon.aws.operators.ssm.SsmGetCommandInvocationOperator`.

This operator is useful for:

* Retrieving output from commands executed by :class:`~airflow.providers.amazon.aws.operators.ssm.SsmRunCommandOperator` in previous tasks
* Getting output from SSM commands executed outside of Airflow
* Inspecting command results for debugging or data processing purposes

To retrieve output from all instances that executed a command:

.. code-block:: python

get_all_output = SsmGetCommandInvocationOperator(
task_id="get_command_output",
command_id='{{ ti.xcom_pull(task_ids="run_command") }}', # From previous task
)

To retrieve output from a specific instance:

.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_ssm.py
:language: python
:dedent: 4
:start-after: [START howto_operator_get_command_invocation]
:end-before: [END howto_operator_get_command_invocation]

The operator returns structured data including:

* Standard output and error content
* Execution status and response codes
* Execution start and end times
* Document name and comments

Sensors
-------
Expand All @@ -79,7 +116,7 @@ To wait on the state of an Amazon SSM run command job until it reaches a termina
IAM Permissions
---------------

You need to ensure the following IAM permissions are granted to allow Airflow to run and monitor SSM Run Command executions:
You need to ensure the following IAM permissions are granted to allow Airflow to run, retrieve and monitor SSM Run Command executions:

.. code-block::

Expand Down
32 changes: 30 additions & 2 deletions providers/amazon/src/airflow/providers/amazon/aws/hooks/ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class SsmHook(AwsBaseHook):
"""
Interact with Amazon Systems Manager (SSM).

Provide thin wrapper around :external+boto3:py:class:`boto3.client("ssm") <SSM.Client>`.
Provide thin wrapper around
:external+boto3:py:class:`boto3.client("ssm") <SSM.Client>`.

Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
Expand All @@ -53,7 +54,9 @@ def __init__(self, *args, **kwargs) -> None:

def get_parameter_value(self, parameter: str, default: str | ArgNotSet = NOTSET) -> str:
"""
Return the provided Parameter or an optional default; if it is encrypted, then decrypt and mask.
Return the provided Parameter or an optional default.

If it is encrypted, then decrypt and mask.

.. seealso::
- :external+boto3:py:meth:`SSM.Client.get_parameter`
Expand All @@ -71,3 +74,28 @@ def get_parameter_value(self, parameter: str, default: str | ArgNotSet = NOTSET)
if isinstance(default, ArgNotSet):
raise
return default

def get_command_invocation(self, command_id: str, instance_id: str) -> dict:
"""
Get the output of a command invocation for a specific instance.

.. seealso::
- :external+boto3:py:meth:`SSM.Client.get_command_invocation`

:param command_id: The ID of the command.
:param instance_id: The ID of the instance.
:return: The command invocation details including output.
"""
return self.conn.get_command_invocation(CommandId=command_id, InstanceId=instance_id)

def list_command_invocations(self, command_id: str) -> dict:
"""
List all command invocations for a given command ID.

.. seealso::
- :external+boto3:py:meth:`SSM.Client.list_command_invocations`

:param command_id: The ID of the command.
:return: Response from SSM list_command_invocations API.
"""
return self.conn.list_command_invocations(CommandId=command_id)
139 changes: 122 additions & 17 deletions providers/amazon/src/airflow/providers/amazon/aws/operators/ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.ssm import SsmHook
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.triggers.ssm import SsmRunCommandTrigger
Expand All @@ -36,27 +35,35 @@ class SsmRunCommandOperator(AwsBaseOperator[SsmHook]):
Executes the SSM Run Command to perform actions on managed instances.

.. seealso::
For more information on how to use this operator, take a look at the guide:
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:SsmRunCommandOperator`

:param document_name: The name of the Amazon Web Services Systems Manager document (SSM document) to run.
:param run_command_kwargs: Optional parameters to pass to the send_command API.

:param wait_for_completion: Whether to wait for cluster to stop. (default: True)
:param waiter_delay: Time in seconds to wait between status checks. (default: 120)
:param waiter_max_attempts: Maximum number of attempts to check for job completion. (default: 75)
:param deferrable: If True, the operator will wait asynchronously for the cluster to stop.
This implies waiting for completion. This mode requires aiobotocore module to be installed.
(default: False)
:param document_name: The name of the Amazon Web Services Systems Manager
document (SSM document) to run.
:param run_command_kwargs: Optional parameters to pass to the send_command
API.

:param wait_for_completion: Whether to wait for cluster to stop.
(default: True)
:param waiter_delay: Time in seconds to wait between status checks.
(default: 120)
:param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 75)
:param deferrable: If True, the operator will wait asynchronously for the
cluster to stop. This implies waiting for completion. This mode
requires aiobotocore module to be installed. (default: False)
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
If this is ``None`` or empty then the default boto3 behaviour is used.
If running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node).
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
:param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
:param verify: Whether or not to verify SSL certificates. See:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
:param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
"""

Expand Down Expand Up @@ -90,7 +97,7 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None
event = validate_execute_complete_event(event)

if event["status"] != "success":
raise AirflowException(f"Error while running run command: {event}")
raise RuntimeError(f"Error while running run command: {event}")

self.log.info("SSM run command `%s` completed.", event["command_id"])
return event["command_id"]
Expand All @@ -112,6 +119,9 @@ def execute(self, context: Context):
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
aws_conn_id=self.aws_conn_id,
region_name=self.region_name,
verify=self.verify,
botocore_config=self.botocore_config,
),
method_name="execute_complete",
)
Expand All @@ -125,7 +135,102 @@ def execute(self, context: Context):
waiter.wait(
CommandId=command_id,
InstanceId=instance_id,
WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": self.waiter_max_attempts},
WaiterConfig={
"Delay": self.waiter_delay,
"MaxAttempts": self.waiter_max_attempts,
},
)

return command_id


class SsmGetCommandInvocationOperator(AwsBaseOperator[SsmHook]):
"""
Retrieves the output and execution details of an SSM command invocation.

This operator allows you to fetch the standard output, standard error,
execution status, and other details from SSM commands. It can be used to
retrieve output from commands executed by SsmRunCommandOperator in previous
tasks, or from commands executed outside of Airflow entirely.

The operator returns structured data including stdout, stderr, execution
times, and status information for each instance that executed the command.

.. seealso::
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:SsmGetCommandInvocationOperator`

:param command_id: The ID of the SSM command to retrieve output for.
:param instance_id: The ID of the specific instance to retrieve output
for. If not provided, retrieves output from all instances that
executed the command.
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used.
If running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node).
:param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
:param verify: Whether or not to verify SSL certificates. See:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
:param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
"""

aws_hook_class = SsmHook
template_fields: Sequence[str] = aws_template_fields(
"command_id",
"instance_id",
)

def __init__(
self,
*,
command_id: str,
instance_id: str | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.command_id = command_id
self.instance_id = instance_id

def execute(self, context: Context) -> dict[str, Any]:
"""Execute the operator to retrieve command invocation output."""
if self.instance_id:
self.log.info(
"Retrieving output for command %s on instance %s",
self.command_id,
self.instance_id,
)
invocations = [{"InstanceId": self.instance_id}]
else:
self.log.info("Retrieving output for command %s from all instances", self.command_id)
response = self.hook.list_command_invocations(self.command_id)
invocations = response.get("CommandInvocations", [])

output_data: dict[str, Any] = {"command_id": self.command_id, "invocations": []}

for invocation in invocations:
instance_id = invocation["InstanceId"]
try:
invocation_details = self.hook.get_command_invocation(self.command_id, instance_id)
output_data["invocations"].append(
{
"instance_id": instance_id,
"status": invocation_details.get("Status", ""),
"response_code": invocation_details.get("ResponseCode", ""),
"standard_output": invocation_details.get("StandardOutputContent", ""),
"standard_error": invocation_details.get("StandardErrorContent", ""),
"execution_start_time": invocation_details.get("ExecutionStartDateTime", ""),
"execution_end_time": invocation_details.get("ExecutionEndDateTime", ""),
"document_name": invocation_details.get("DocumentName", ""),
"comment": invocation_details.get("Comment", ""),
}
)
except Exception as e:
self.log.warning("Failed to get output for instance %s: %s", instance_id, e)
output_data["invocations"].append({"instance_id": instance_id, "error": str(e)})

return output_data
50 changes: 33 additions & 17 deletions providers/amazon/src/airflow/providers/amazon/aws/sensors/ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.ssm import SsmHook
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
from airflow.providers.amazon.aws.triggers.ssm import SsmRunCommandTrigger
Expand All @@ -34,32 +33,45 @@

class SsmRunCommandCompletedSensor(AwsBaseSensor[SsmHook]):
"""
Poll the state of an AWS SSM Run Command until all instance jobs reach a terminal state. Fails if any instance job ends in a failed state.
Poll the state of an AWS SSM Run Command until completion.

Waits until all instance jobs reach a terminal state. Fails if any
instance job ends in a failed state.

.. seealso::
For more information on how to use this sensor, take a look at the guide:
For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:SsmRunCommandCompletedSensor`

:param command_id: The ID of the AWS SSM Run Command.

:param deferrable: If True, the sensor will operate in deferrable mode. This mode requires aiobotocore
module to be installed.
(default: False, but can be overridden in config file by setting default_deferrable to True)
:param poke_interval: Polling period in seconds to check for the status of the job. (default: 120)
:param max_retries: Number of times before returning the current state. (default: 75)
:param deferrable: If True, the sensor will operate in deferrable mode.
This mode requires aiobotocore module to be installed.
(default: False, but can be overridden in config file by setting
default_deferrable to True)
:param poke_interval: Polling period in seconds to check for the status
of the job. (default: 120)
:param max_retries: Number of times before returning the current state.
(default: 75)
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
If this is ``None`` or empty then the default boto3 behaviour is used.
If running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node).
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
:param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
:param verify: Whether or not to verify SSL certificates. See:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
:param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
"""

INTERMEDIATE_STATES: tuple[str, ...] = ("Pending", "Delayed", "InProgress", "Cancelling")
INTERMEDIATE_STATES: tuple[str, ...] = (
"Pending",
"Delayed",
"InProgress",
"Cancelling",
)
FAILURE_STATES: tuple[str, ...] = ("Cancelled", "TimedOut", "Failed")
SUCCESS_STATES: tuple[str, ...] = ("Success",)
FAILURE_MESSAGE = "SSM run command sensor failed."
Expand Down Expand Up @@ -89,14 +101,18 @@ def poke(self, context: Context):
command_invocations = response.get("CommandInvocations", [])

if not command_invocations:
self.log.info("No command invocations found for command_id=%s yet, waiting...", self.command_id)
self.log.info(
"No command invocations found",
"command_id=%s yet, waiting...",
self.command_id,
)
return False

for invocation in command_invocations:
state = invocation["Status"]

if state in self.FAILURE_STATES:
raise AirflowException(self.FAILURE_MESSAGE)
raise RuntimeError(self.FAILURE_MESSAGE)

if state in self.INTERMEDIATE_STATES:
return False
Expand All @@ -122,6 +138,6 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None
event = validate_execute_complete_event(event)

if event["status"] != "success":
raise AirflowException(f"Error while running run command: {event}")
raise RuntimeError(f"Error while running run command: {event}")

self.log.info("SSM run command `%s` completed.", event["command_id"])
Loading