Skip to content

Commit

Permalink
AWS Secrets Manager Backend - major update (#27920)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwreeves authored Dec 5, 2022
1 parent c8e348d commit 8f0265d
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 281 deletions.
244 changes: 48 additions & 196 deletions airflow/providers/amazon/aws/secrets/secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,16 @@
"""Objects relating to sourcing secrets from AWS Secrets Manager"""
from __future__ import annotations

import ast
import json
import warnings
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote, urlencode
from typing import Any
from urllib.parse import unquote

from airflow.compat.functools import cached_property
from airflow.providers.amazon.aws.utils import get_airflow_version, trim_none_values
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
# Avoid circular import problems when instantiating the backend during configuration.
from airflow.models.connection import Connection


class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
"""
Expand Down Expand Up @@ -74,7 +69,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
.. code-block:: python
possible_words_for_conn_fields = {
"login": ["user", "username", "login", "user_name"],
"login": ["login", "user", "username", "user_name"],
"password": ["password", "pass", "key"],
"host": ["host", "remote_host", "server"],
"port": ["port"],
Expand All @@ -96,12 +91,6 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
If set to None (null value in the configuration), requests for configurations will not be sent to
AWS Secrets Manager. If you don't want a config_prefix, set it as an empty string
:param sep: separator used to concatenate secret_prefix and secret_id. Default: "/"
:param full_url_mode: if True, the secrets must be stored as one conn URI in just one field per secret.
If False (set it as false in backend_kwargs), you can store the secret using different
fields (password, user...).
:param are_secret_values_urlencoded: If True, and full_url_mode is False, then the values are assumed to
be URL-encoded and will be decoded before being passed into a Connection object. This option is
ignored when full_url_mode is True.
:param extra_conn_words: for using just when you set full_url_mode as false and store
the secrets in different fields of secrets manager. You can add more words for each connection
part beyond the default ones. The extra words to be searched should be passed as a dict of lists,
Expand All @@ -115,8 +104,6 @@ def __init__(
variables_prefix: str = "airflow/variables",
config_prefix: str = "airflow/config",
sep: str = "/",
full_url_mode: bool = True,
are_secret_values_urlencoded: bool | None = None,
extra_conn_words: dict[str, list[str]] | None = None,
**kwargs,
):
Expand All @@ -134,27 +121,29 @@ def __init__(
else:
self.config_prefix = config_prefix
self.sep = sep
self.full_url_mode = full_url_mode

if are_secret_values_urlencoded is None:
self.are_secret_values_urlencoded = True
else:
if kwargs.pop("full_url_mode", None) is not None:
warnings.warn(
"The `secret_values_are_urlencoded` kwarg only exists to assist in migrating away from"
" URL-encoding secret values when `full_url_mode` is False. It will be considered deprecated"
" when values are not required to be URL-encoded by default.",
"The `full_url_mode` kwarg is deprecated. Going forward, the `SecretsManagerBackend`"
" will support both URL-encoded and JSON-encoded secrets at the same time. The encoding"
" of the secret will be determined automatically.",
DeprecationWarning,
stacklevel=2,
)
if full_url_mode and not are_secret_values_urlencoded:
warnings.warn(
"The `secret_values_are_urlencoded` kwarg for the SecretsManagerBackend is only used"
" when `full_url_mode` is False. When `full_url_mode` is True, the secret needs to be"
" URL-encoded.",
UserWarning,
stacklevel=2,
)
self.are_secret_values_urlencoded = are_secret_values_urlencoded

if kwargs.get("are_secret_values_urlencoded") is not None:
warnings.warn(
"The `secret_values_are_urlencoded` is deprecated. This kwarg only exists to assist in"
" migrating away from URL-encoding secret values for JSON secrets."
" To remove this warning, make sure your JSON secrets are *NOT* URL-encoded, and then"
" remove this kwarg from backend_kwargs.",
DeprecationWarning,
stacklevel=2,
)
self.are_secret_values_urlencoded = kwargs.pop("are_secret_values_urlencoded", None)
else:
self.are_secret_values_urlencoded = False

self.extra_conn_words = extra_conn_words or {}

self.profile_name = kwargs.get("profile_name", None)
Expand Down Expand Up @@ -185,59 +174,10 @@ def client(self):
session = SessionFactory(conn=conn_config).create_session()
return session.client(service_name="secretsmanager", **client_kwargs)

@staticmethod
def _format_uri_with_extra(secret, conn_string: str) -> str:
try:
extra_dict = secret["extra"]
except KeyError:
return conn_string

extra = json.loads(extra_dict) # this is needed because extra_dict is a string and we need a dict
conn_string = f"{conn_string}?{urlencode(extra)}"

return conn_string

def get_connection(self, conn_id: str) -> Connection | None:
if not self.full_url_mode:
# Avoid circular import problems when instantiating the backend during configuration.
from airflow.models.connection import Connection

secret_string = self._get_secret(self.connections_prefix, conn_id)
secret_dict = self._deserialize_json_string(secret_string)

if not secret_dict:
return None

if "extra" in secret_dict and isinstance(secret_dict["extra"], str):
secret_dict["extra"] = self._deserialize_json_string(secret_dict["extra"])

data = self._standardize_secret_keys(secret_dict)

if self.are_secret_values_urlencoded:
data = self._remove_escaping_in_secret_dict(secret=data, conn_id=conn_id)

port: int | None = None

if data["port"] is not None:
port = int(data["port"])

return Connection(
conn_id=conn_id,
login=data["user"],
password=data["password"],
host=data["host"],
port=port,
schema=data["schema"],
conn_type=data["conn_type"],
extra=data["extra"],
)

return super().get_connection(conn_id=conn_id)

def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]:
"""Standardize the names of the keys in the dict. These keys align with"""
possible_words_for_conn_fields = {
"user": ["user", "username", "login", "user_name"],
"login": ["login", "user", "username", "user_name"],
"password": ["password", "pass", "key"],
"host": ["host", "remote_host", "server"],
"port": ["port"],
Expand All @@ -247,6 +187,9 @@ def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]:
}

for conn_field, extra_words in self.extra_conn_words.items():
if conn_field == "user":
# Support `user` for backwards compatibility.
conn_field = "login"
possible_words_for_conn_fields[conn_field].extend(extra_words)

conn_d: dict[str, Any] = {}
Expand All @@ -258,96 +201,16 @@ def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]:

return conn_d

def get_uri_from_secret(self, secret: dict[str, str]) -> str:
conn_d: dict[str, str] = {k: v if v else "" for k, v in self._standardize_secret_keys(secret).items()}
conn_string = "{conn_type}://{user}:{password}@{host}:{port}/{schema}".format(**conn_d)
return self._format_uri_with_extra(secret, conn_string)

def _deserialize_json_string(self, value: str | None) -> dict[Any, Any] | None:
if not value:
return None
try:
# Use ast.literal_eval for backwards compatibility.
# Previous version of this code had a comment saying that using json.loads caused errors.
# This likely means people were using dict reprs instead of valid JSONs.
res: dict[str, Any] = json.loads(value)
except json.JSONDecodeError:
try:
res = ast.literal_eval(value) if value else None
warnings.warn(
f"In future versions, `{type(self).__name__}` will only support valid JSONs, not dict"
" reprs. Please make sure your secret is a valid JSON."
)
except ValueError: # 'malformed node or string: ' error, for empty conns
return None

return res

def _remove_escaping_in_secret_dict(self, secret: dict[str, Any], conn_id: str) -> dict[str, Any]:
# When ``unquote(v) == v``, then removing unquote won't affect the user, regardless of
# whether or not ``v`` is URL-encoded. For example, "foo bar" is not URL-encoded. But
# because decoding it doesn't affect the value, then it will migrate safely when
# ``unquote`` gets removed.
#
# When parameters are URL-encoded, but decoding is idempotent, we need to warn the user
# to un-escape their secrets. For example, if "foo%20bar" is a URL-encoded string, then
# decoding is idempotent because ``unquote(unquote("foo%20bar")) == unquote("foo%20bar")``.
#
# In the rare situation that value is URL-encoded but the decoding is _not_ idempotent,
# this causes a major issue. For example, if "foo%2520bar" is URL-encoded, then decoding is
# _not_ idempotent because ``unquote(unquote("foo%2520bar")) != unquote("foo%2520bar")``
#
# This causes a problem for migration because if the user decodes their value, we cannot
# infer that is the case by looking at the decoded value (from our vantage point, it will
# look to be URL-encoded.)
#
# So when this uncommon situation occurs, the user _must_ adjust the configuration and set
# ``parameters_are_urlencoded`` to False to migrate safely. In all other cases, we do not
# need the user to adjust this object to migrate; they can transition their secrets with
# the default configuration.

warn_user = False
idempotent = True

def _remove_escaping_in_secret_dict(self, secret: dict[str, Any]) -> dict[str, Any]:
"""Un-escape secret values that are URL-encoded"""
for k, v in secret.copy().items():

if k == "extra" and isinstance(v, dict):
# The old behavior was that extras were _not_ urlencoded inside the secret.
# If they were urlencoded (e.g. "foo%20bar"), then they would be re-urlencoded
# (e.g. "foo%20bar" becomes "foo%2520bar") and then unquoted once when parsed.
# So we should just allow the extra dict to remain as-is.
continue

elif v is not None:
v_unquoted = unquote(v)
if v != v_unquoted:
secret[k] = unquote(v)
warn_user = True

# Check to see if decoding is idempotent.
if v_unquoted == unquote(v_unquoted):
idempotent = False

if warn_user:
msg = (
"When full_url_mode=False, URL-encoding secret values is deprecated. In future versions, "
f"this value will not be un-escaped. For the conn_id {conn_id!r}, please remove the "
"URL-encoding.\n\n"
"This warning was raised because the SecretsManagerBackend detected that this "
"connection was URL-encoded."
)
if idempotent:
msg = f" Once the values for conn_id {conn_id!r} are decoded, this warning will go away."
if not idempotent:
msg += (
" In addition to decoding the values for your connection, you must also set"
" secret_values_are_urlencoded=False for your config variable"
" secrets.backend_kwargs because this connection's URL encoding is not idempotent."
" For more information, see:"
" https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/secrets-backends"
"/aws-secrets-manager.html#url-encoding-of-secrets-when-full-url-mode-is-false"
)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
secret[k] = unquote(v)

return secret

Expand All @@ -360,38 +223,27 @@ def get_conn_value(self, conn_id: str) -> str | None:
if self.connections_prefix is None:
return None

if self.full_url_mode:
return self._get_secret(self.connections_prefix, conn_id)
secret = self._get_secret(self.connections_prefix, conn_id)

if secret is not None and secret.strip().startswith("{"):
# Before Airflow 2.3, the AWS SecretsManagerBackend added support for JSON secrets.
#
# The way this was implemented differs a little from how Airflow's core API handle JSON secrets.
#
# The most notable difference is that SecretsManagerBackend supports extra aliases for the
# Connection parts, e.g. "users" is allowed instead of "login".
#
# This means we need to deserialize then re-serialize the secret if it's a JSON, potentially
# renaming some keys in the process.

secret_dict = json.loads(secret)
standardized_secret_dict = self._standardize_secret_keys(secret_dict)
if self.are_secret_values_urlencoded:
standardized_secret_dict = self._remove_escaping_in_secret_dict(standardized_secret_dict)
standardized_secret = json.dumps(standardized_secret_dict)
return standardized_secret
else:
warnings.warn(
f"In future versions, `{type(self).__name__}.get_conn_value` will return a JSON string when"
" full_url_mode is False, not a URI.",
DeprecationWarning,
)

# It is very rare for user code to get to this point, since:
#
# - When full_url_mode is True, the previous statement returns.
# - When full_url_mode is False, get_connection() does not call
# `get_conn_value`. Additionally, full_url_mode defaults to True.
#
# So the code would have to be calling `get_conn_value` directly, and
# the user would be using a non-default setting.
#
# As of Airflow 2.3.0, get_conn_value() is allowed to return a JSON
# string in the base implementation. This is a way to deprecate this
# behavior gracefully.

secret_string = self._get_secret(self.connections_prefix, conn_id)

secret = self._deserialize_json_string(secret_string)
connection = None

# These lines will check if we have with some denomination stored an username, password and host
if secret:
connection = self.get_uri_from_secret(secret)

return connection
return secret

def get_conn_uri(self, conn_id: str) -> str | None:
"""
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Loading

0 comments on commit 8f0265d

Please sign in to comment.