Skip to content

Commit

Permalink
Add browser IDC authentication method (#950)
Browse files Browse the repository at this point in the history
* Add authentication methods and unit tests.
* Document types and exclude IAM=true from identity center connections
* Update redshift connector version to accept new auth method.
* Pare this pull request down to Browser based authentication for now
* Handle optional timeout field correctly
* Remove the authenticator param
* Handle other parms with default values better.
* Pin connector from above.
* Make the enum have class methods.
  • Loading branch information
VersusFacit authored Nov 25, 2024
1 parent cf5acf1 commit c457faf
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 106 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241122-143326.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add browser identity center authentication method.
time: 2024-11-22T14:33:26.549878-08:00
custom:
Author: versusfacit
Issue: "898"
266 changes: 165 additions & 101 deletions dbt/adapters/redshift/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,24 @@ def get_message(self) -> str:
logger = AdapterLogger("Redshift")


class IdentityCenterTokenType(StrEnum):
ACCESS_TOKEN = "ACCESS_TOKEN"
EXT_JWT = "EXT_JWT"


class RedshiftConnectionMethod(StrEnum):
DATABASE = "database"
IAM = "iam"
IAM_ROLE = "iam_role"
IAM_IDENTITY_CENTER_BROWSER = "browser_identity_center"

@classmethod
def uses_identity_center(cls, method: str) -> bool:
return method in (cls.IAM_IDENTITY_CENTER_BROWSER,)

@classmethod
def is_iam(cls, method: str) -> bool:
return not cls.uses_identity_center(method)


class UserSSLMode(StrEnum):
Expand Down Expand Up @@ -128,6 +142,17 @@ class RedshiftCredentials(Credentials):
access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None

#
# IAM identity center methods
#

# browser
idc_region: Optional[str] = None
issuer_url: Optional[str] = None
idp_listen_port: Optional[int] = 7890
idc_client_display_name: Optional[str] = "Amazon Redshift driver"
idp_response_timeout: Optional[int] = None

_ALIASES = {"dbname": "database", "pass": "password"}

@property
Expand Down Expand Up @@ -163,131 +188,171 @@ def unique_field(self) -> str:
return self.host


class RedshiftConnectMethodFactory:
credentials: RedshiftCredentials

def __init__(self, credentials) -> None:
self.credentials = credentials

def get_connect_method(self) -> Callable[[], redshift_connector.Connection]:
def get_connection_method(
credentials: RedshiftCredentials,
) -> Callable[[], redshift_connector.Connection]:
#
# Helper Methods
#
def __validate_required_fields(method_name: str, required_fields: Tuple[str, ...]):
missing_fields: List[str] = [
field for field in required_fields if getattr(credentials, field, None) is None
]
if missing_fields:
fields_str: str = "', '".join(missing_fields)
raise FailedToConnectError(
f"'{fields_str}' field(s) are required for '{method_name}' credentials method"
)

# Support missing 'method' for backwards compatibility
method = self.credentials.method or RedshiftConnectionMethod.DATABASE
if method == RedshiftConnectionMethod.DATABASE:
kwargs = self._database_kwargs
elif method == RedshiftConnectionMethod.IAM:
kwargs = self._iam_user_kwargs
elif method == RedshiftConnectionMethod.IAM_ROLE:
kwargs = self._iam_role_kwargs
else:
raise FailedToConnectError(f"Invalid 'method' in profile: '{method}'")
def __base_kwargs(credentials) -> Dict[str, Any]:
redshift_ssl_config: Dict[str, Any] = RedshiftSSLConfig.parse(
credentials.sslmode
).to_dict()
return {
"host": credentials.host,
"port": int(credentials.port) if credentials.port else 5439,
"database": credentials.database,
"region": credentials.region,
"auto_create": credentials.autocreate,
"db_groups": credentials.db_groups,
"timeout": credentials.connect_timeout,
**redshift_ssl_config,
}

def connect() -> redshift_connector.Connection:
c = redshift_connector.connect(**kwargs)
if self.credentials.autocommit:
c.autocommit = True
if self.credentials.role:
c.cursor().execute(f"set role {self.credentials.role}")
return c
def __iam_kwargs(credentials) -> Dict[str, Any]:

return connect
# iam True except for identity center methods
iam: bool = RedshiftConnectionMethod.is_iam(credentials.method)

@property
def _database_kwargs(self) -> Dict[str, Any]:
logger.debug("Connecting to redshift with 'database' credentials method")
kwargs = self._base_kwargs

if self.credentials.user and self.credentials.password:
kwargs.update(
user=self.credentials.user,
password=self.credentials.password,
)
cluster_identifier: Optional[str]
if "serverless" in credentials.host or RedshiftConnectionMethod.uses_identity_center(
credentials.method
):
cluster_identifier = None
elif credentials.cluster_id:
cluster_identifier = credentials.cluster_id
else:
raise FailedToConnectError(
"'user' and 'password' fields are required for 'database' credentials method"
"Failed to use IAM method:"
" 'cluster_id' must be provided for provisioned cluster"
" 'host' must be provided for serverless endpoint"
)

return kwargs
iam_specific_kwargs: Dict[str, Any] = {
"iam": iam,
"user": "",
"password": "",
"cluster_identifier": cluster_identifier,
}

return __base_kwargs(credentials) | iam_specific_kwargs

@property
def _iam_user_kwargs(self) -> Dict[str, Any]:
logger.debug("Connecting to redshift with 'iam' credentials method")
kwargs = self._iam_kwargs

if self.credentials.access_key_id and self.credentials.secret_access_key:
kwargs.update(
access_key_id=self.credentials.access_key_id,
secret_access_key=self.credentials.secret_access_key,
)
elif self.credentials.access_key_id or self.credentials.secret_access_key:
def __database_kwargs(credentials) -> Dict[str, Any]:
logger.debug("Connecting to Redshift with 'database' credentials method")

__validate_required_fields("database", ("user", "password"))

db_credentials: Dict[str, Any] = {
"user": credentials.user,
"password": credentials.password,
}

return __base_kwargs(credentials) | db_credentials

def __iam_user_kwargs(credentials) -> Dict[str, Any]:
logger.debug("Connecting to Redshift with 'iam' credentials method")

iam_credentials: Dict[str, Any]
if credentials.access_key_id and credentials.secret_access_key:
iam_credentials = {
"access_key_id": credentials.access_key_id,
"secret_access_key": credentials.secret_access_key,
}
elif credentials.access_key_id or credentials.secret_access_key:
raise FailedToConnectError(
"'access_key_id' and 'secret_access_key' are both needed if providing explicit credentials"
)
else:
kwargs.update(profile=self.credentials.iam_profile)
iam_credentials = {"profile": credentials.iam_profile}

if user := self.credentials.user:
kwargs.update(db_user=user)
else:
raise FailedToConnectError("'user' field is required for 'iam' credentials method")
__validate_required_fields("iam", ("user",))
iam_credentials["db_user"] = credentials.user

return kwargs
return __iam_kwargs(credentials) | iam_credentials

@property
def _iam_role_kwargs(self) -> Dict[str, Optional[Any]]:
logger.debug("Connecting to redshift with 'iam_role' credentials method")
kwargs = self._iam_kwargs
def __iam_role_kwargs(credentials) -> Dict[str, Any]:
logger.debug("Connecting to Redshift with 'iam_role' credentials method")
role_kwargs = {
"db_user": None,
"group_federation": "serverless" not in credentials.host,
}

# It's a role, we're ignoring the user
kwargs.update(db_user=None)
if credentials.iam_profile:
role_kwargs["profile"] = credentials.iam_profile

# Serverless shouldn't get group_federation, Provisoned clusters should
if "serverless" in self.credentials.host:
kwargs.update(group_federation=False)
else:
kwargs.update(group_federation=True)
return __iam_kwargs(credentials) | role_kwargs

if iam_profile := self.credentials.iam_profile:
kwargs.update(profile=iam_profile)
def __iam_idc_browser_kwargs(credentials) -> Dict[str, Any]:
logger.debug("Connecting to Redshift with '{credentials.method}' credentials method")

return kwargs
__IDP_TIMEOUT: int = 60
__LISTEN_PORT_DEFAULT: int = 7890

@property
def _iam_kwargs(self) -> Dict[str, Any]:
kwargs = self._base_kwargs
kwargs.update(
iam=True,
user="",
password="",
__validate_required_fields(
"browser_identity_center", ("method", "idc_region", "issuer_url")
)

if "serverless" in self.credentials.host:
kwargs.update(cluster_identifier=None)
elif cluster_id := self.credentials.cluster_id:
kwargs.update(cluster_identifier=cluster_id)
else:
raise FailedToConnectError(
"Failed to use IAM method:"
" 'cluster_id' must be provided for provisioned cluster"
" 'host' must be provided for serverless endpoint"
)
idp_timeout: int = (
timeout
if (timeout := credentials.idp_response_timeout) or timeout == 0
else __IDP_TIMEOUT
)

return kwargs
idp_listen_port: int = (
port if (port := credentials.idp_listen_port) else __LISTEN_PORT_DEFAULT
)

@property
def _base_kwargs(self) -> Dict[str, Any]:
kwargs = {
"host": self.credentials.host,
"port": int(self.credentials.port) if self.credentials.port else int(5439),
"database": self.credentials.database,
"region": self.credentials.region,
"auto_create": self.credentials.autocreate,
"db_groups": self.credentials.db_groups,
"timeout": self.credentials.connect_timeout,
idc_kwargs: Dict[str, Any] = {
"credentials_provider": "BrowserIdcAuthPlugin",
"issuer_url": credentials.issuer_url,
"listen_port": idp_listen_port,
"idc_region": credentials.idc_region,
"idc_client_display_name": credentials.idc_client_display_name,
"idp_response_timeout": idp_timeout,
}
redshift_ssl_config = RedshiftSSLConfig.parse(self.credentials.sslmode)
kwargs.update(redshift_ssl_config.to_dict())
return kwargs

return __iam_kwargs(credentials) | idc_kwargs

#
# Head of function execution
#

method_to_kwargs_function = {
None: __database_kwargs,
RedshiftConnectionMethod.DATABASE: __database_kwargs,
RedshiftConnectionMethod.IAM: __iam_user_kwargs,
RedshiftConnectionMethod.IAM_ROLE: __iam_role_kwargs,
RedshiftConnectionMethod.IAM_IDENTITY_CENTER_BROWSER: __iam_idc_browser_kwargs,
}

try:
kwargs_function: Callable[[RedshiftCredentials], Dict[str, Any]] = (
method_to_kwargs_function[credentials.method]
)
except KeyError:
raise FailedToConnectError(f"Invalid 'method' in profile: '{credentials.method}'")

kwargs: Dict[str, Any] = kwargs_function(credentials)

def connect() -> redshift_connector.Connection:
c = redshift_connector.connect(**kwargs)
if credentials.autocommit:
c.autocommit = True
if credentials.role:
c.cursor().execute(f"set role {credentials.role}")
return c

return connect


class RedshiftConnectionManager(SQLConnectionManager):
Expand Down Expand Up @@ -373,7 +438,6 @@ def open(cls, connection):
return connection

credentials = connection.credentials
connect_method_factory = RedshiftConnectMethodFactory(credentials)

def exponential_backoff(attempt: int):
return attempt * attempt
Expand All @@ -387,7 +451,7 @@ def exponential_backoff(attempt: int):

open_connection = cls.retry_connection(
connection,
connect=connect_method_factory.get_connect_method(),
connect=get_connection_method(credentials),
logger=logger,
retry_limit=credentials.retries,
retry_timeout=exponential_backoff,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _plugin_version() -> str:
"dbt-postgres>=1.8,<1.10",
# dbt-redshift depends deeply on this package. it does not follow SemVer, therefore there have been breaking changes in previous patch releases
# Pin to the patch or minor version, and bump in each new minor version of dbt-redshift.
"redshift-connector<2.1.1,>=2.0.913,!=2.0.914",
"redshift-connector>=2.1.3,<2.2",
# add dbt-core to ensure backwards compatibility of installation, this is not a functional dependency
"dbt-core>=1.8.0b3",
# installed via dbt-core but referenced directly; don't pin to avoid version conflicts with dbt-core
Expand Down
Loading

0 comments on commit c457faf

Please sign in to comment.