diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index 080ea969b4cd6..90efd95309d92 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -77,10 +77,13 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None): try: access_key = Config.get(cred_section, key_id_option) secret_key = Config.get(cred_section, secret_key_option) + calling_format = None + if Config.has_option(cred_section, 'calling_format'): + calling_format = Config.get(cred_section, 'calling_format') except: logging.warning("Option Error in parsing s3 config file") raise - return (access_key, secret_key) + return (access_key, secret_key, calling_format) class S3Hook(BaseHook): @@ -94,12 +97,15 @@ def __init__( self.s3_conn = self.get_connection(s3_conn_id) self.extra_params = self.s3_conn.extra_dejson self.profile = self.extra_params.get('profile') + self.calling_format = None self._creds_in_conn = 'aws_secret_access_key' in self.extra_params self._creds_in_config_file = 's3_config_file' in self.extra_params self._default_to_boto = False if self._creds_in_conn: self._a_key = self.extra_params['aws_access_key_id'] self._s_key = self.extra_params['aws_secret_access_key'] + if 'calling_format' in self.extra_params: + self.calling_format = self.extra_params['calling_format'] elif self._creds_in_config_file: self.s3_config_file = self.extra_params['s3_config_file'] # The format can be None and will default to boto in the parser @@ -151,12 +157,17 @@ def get_conn(self): return S3Connection(profile_name=self.profile) a_key = s_key = None if self._creds_in_config_file: - a_key, s_key = _parse_s3_config(self.s3_config_file, - self.s3_config_format, - self.profile) + a_key, s_key, calling_format = _parse_s3_config(self.s3_config_file, + self.s3_config_format, + self.profile) elif self._creds_in_conn: a_key = self._a_key s_key = self._s_key + calling_format = self.calling_format + + if calling_format is None: + calling_format = 'boto.s3.connection.SubdomainCallingFormat' + if self._sts_conn_required: sts_connection = STSConnection(aws_access_key_id=a_key, aws_secret_access_key=s_key, @@ -169,11 +180,13 @@ def get_conn(self): connection = S3Connection( aws_access_key_id=creds.access_key, aws_secret_access_key=creds.secret_key, + calling_format=calling_format, security_token=creds.session_token ) else: connection = S3Connection(aws_access_key_id=a_key, aws_secret_access_key=s_key, + calling_format=calling_format, profile_name=self.profile) return connection