@@ -170,9 +170,28 @@ private [redshift] object Parameters extends Logging {
170170 * use of standard system properties, environment variables, or IAM role configuration if available.
171171 */
172172 def credentialsString (configuration : Configuration ) = {
173+ val ((_, accessKeyId), (_, secretAccessKey)) = credentialsTuple(configuration)
173174
175+ s " aws_access_key_id= $accessKeyId;aws_secret_access_key= $secretAccessKey"
176+ }
177+
178+ /**
179+ * Looks up "aws_access_key_id" and "aws_secret_access_key" in the parameter map
180+ * and ensures they are set on the Configuration. If no credentials have been provided,
181+ * this function will instead try using the Hadoop Configuration fs.* settings for the provided tempDir
182+ * scheme, and if that also fails, it finally tries AWS DefaultCredentialsProviderChain, which makes
183+ * use of standard system properties, environment variables, or IAM role configuration if available.
184+ */
185+ def setCredentials (configuration : Configuration ): Unit = {
186+ val ((accessKeyIdProp, accessKeyId), (secretAccessKeyProp, secretAccessKey)) = credentialsTuple(configuration)
187+
188+ configuration.setIfUnset(accessKeyIdProp, accessKeyId)
189+ configuration.setIfUnset(secretAccessKeyProp, secretAccessKey)
190+ }
191+
192+ private def credentialsTuple (configuration : Configuration ) = {
174193 val scheme = new URI (tempDir).getScheme
175- val hadoopConfPrefix = s " fs. $scheme} "
194+ val hadoopConfPrefix = s " fs. $scheme"
176195
177196 val (accessKeyId, secretAccessKey) =
178197 if (parameters.contains(" aws_access_key_id" )) {
@@ -192,7 +211,7 @@ private [redshift] object Parameters extends Logging {
192211 }
193212 }
194213
195- s " aws_access_key_id= $ accessKeyId;aws_secret_access_key= $secretAccessKey "
214+ (( s " $hadoopConfPrefix .awsAccessKeyId " , accessKeyId), ( s " $hadoopConfPrefix .awsSecretAccessKey " , secretAccessKey))
196215 }
197216 }
198217}
0 commit comments