diff --git a/clients/hadoopfs/CHANGELOG.md b/clients/hadoopfs/CHANGELOG.md
index f518d6f37e1..6685bdbeccb 100644
--- a/clients/hadoopfs/CHANGELOG.md
+++ b/clients/hadoopfs/CHANGELOG.md
@@ -2,6 +2,8 @@
## _Upcoming_
+lakeFSFS: new Token Provider feature with IAM Role Support for lakeFS authentication (#7659 + #7604)
+
## 0.2.3
* Fix createDirectoryMarkerIfNotExists (#7510)
diff --git a/clients/hadoopfs/pom.xml b/clients/hadoopfs/pom.xml
index 23a9a6551b2..ff31651e65a 100644
--- a/clients/hadoopfs/pom.xml
+++ b/clients/hadoopfs/pom.xml
@@ -287,7 +287,7 @@ To export to S3:
io.lakefs
sdk
- 1.17.0
+ 1.18.0
org.apache.commons
diff --git a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java
index 8d2fb35121b..750d0c22144 100644
--- a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java
+++ b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java
@@ -13,7 +13,9 @@ public class Constants {
public static final String ACCESS_MODE_KEY_SUFFIX = "access.mode";
// io.lakefs.auth.TemporaryAWSCredentialsLakeFSTokenProvider, io.lakefs.auth.InstanceProfileAWSCredentialsLakeFSTokenProvider
public static final String LAKEFS_AUTH_PROVIDER_KEY_SUFFIX = "auth.provider";
+
// TODO(isan) document all configuration fields before merge.
+ public static final String LAKEFS_AUTH_TOKEN_TTL_KEY_SUFFIX = "token.duration_seconds";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_ACCESS_KEY_SUFFIX = "token.aws.access.key";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_SECRET_KEY_SUFFIX = "token.aws.secret.key";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_SESSION_TOKEN_KEY_SUFFIX = "token.aws.session.token";
diff --git a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java
index 591f43eafa7..4925bbac051 100644
--- a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java
+++ b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java
@@ -6,19 +6,20 @@
import io.lakefs.clients.sdk.auth.HttpBasicAuth;
import io.lakefs.clients.sdk.auth.HttpBearerAuth;
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import static io.lakefs.auth.LakeFSTokenProviderFactory.newLakeFSTokenProvider;
-
/**
* Provides access to lakeFS API using client library.
* This class uses the configuration to initialize API client and instance per API interface we expose.
*/
public class LakeFSClient {
+ private static final Logger LOG = LoggerFactory.getLogger(LakeFSClient.class);
private static final String BASIC_AUTH = "basic_auth";
private static final String JWT_TOKEN_AUTH = "jwt_token";
-
+ LakeFSTokenProvider provider;
private final ObjectsApi objectsApi;
private final StagingApi stagingApi;
private final RepositoriesApi repositoriesApi;
@@ -29,7 +30,7 @@ public class LakeFSClient {
public LakeFSClient(String scheme, Configuration conf) throws IOException {
String authProvider = FSConfiguration.get(conf, scheme, Constants.LAKEFS_AUTH_PROVIDER_KEY_SUFFIX, LakeFSClient.BASIC_AUTH);
ApiClient apiClient;
-
+ LOG.info("Initiating lakeFS auth provider: {}", authProvider);
if (authProvider == BASIC_AUTH) {
String accessKey = FSConfiguration.get(conf, scheme, Constants.ACCESS_KEY_KEY_SUFFIX);
if (accessKey == null) {
@@ -45,14 +46,11 @@ public LakeFSClient(String scheme, Configuration conf) throws IOException {
basicAuth.setUsername(accessKey);
basicAuth.setPassword(secretKey);
} else {
- // TODO(isan) depends on missing functionality PR https://github.com/treeverse/lakeFS/pull/7578 being merged.
- // once merged, we can use the following code to get the token
- throw new IOException("Unsupported auth provider: " + authProvider + ". Only basic_auth is supported at the moment.");
-// LakeFSTokenProvider tokenProvider = newLakeFSTokenProvider(scheme, conf);
-// String jwt = tokenProvider.getToken();
-// apiClient = newApiClientNoAuth(scheme, conf);
-// HttpBearerAuth tokenAuth = (HttpBearerAuth)apiClient.getAuthentication(JWT_TOKEN_AUTH);
-// tokenAuth.setBearerToken(jwt);
+ this.provider = LakeFSTokenProviderFactory.newLakeFSTokenProvider(Constants.DEFAULT_SCHEME, conf);
+ String lakeFSToken = provider.getToken();
+ apiClient = newApiClientNoAuth(scheme, conf);
+ HttpBearerAuth tokenAuth = (HttpBearerAuth) apiClient.getAuthentication(JWT_TOKEN_AUTH);
+ tokenAuth.setBearerToken(lakeFSToken);
}
this.objectsApi = new ObjectsApi(apiClient);
diff --git a/clients/hadoopfs/src/main/java/io/lakefs/auth/AWSLakeFSTokenProvider.java b/clients/hadoopfs/src/main/java/io/lakefs/auth/AWSLakeFSTokenProvider.java
index 03e449a0e13..3fb09ee09e1 100644
--- a/clients/hadoopfs/src/main/java/io/lakefs/auth/AWSLakeFSTokenProvider.java
+++ b/clients/hadoopfs/src/main/java/io/lakefs/auth/AWSLakeFSTokenProvider.java
@@ -1,17 +1,22 @@
package io.lakefs.auth;
+
import com.amazonaws.auth.AWSCredentialsProvider;
import io.lakefs.Constants;
import io.lakefs.FSConfiguration;
import io.lakefs.clients.sdk.ApiClient;
+import io.lakefs.clients.sdk.AuthApi;
+import io.lakefs.clients.sdk.model.ExternalLoginInformation;
import io.lakefs.clients.sdk.model.AuthenticationToken;
import org.apache.commons.codec.binary.Base64;
import java.io.IOException;
+
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
@@ -23,6 +28,7 @@ public class AWSLakeFSTokenProvider implements LakeFSTokenProvider {
String stsEndpoint;
Map stsAdditionalHeaders;
int stsExpirationInSeconds;
+ Optional lakeFSTokenTTLSeconds = Optional.empty();
ApiClient lakeFSApi;
AWSLakeFSTokenProvider() {
@@ -68,6 +74,12 @@ protected void initialize(AWSCredentialsProvider awsProvider, String scheme, Con
}
this.lakeFSApi.setBasePath(endpoint);
+ // optional timeout for lakeFS token
+ int tokenTTL = FSConfiguration.getInt(conf, scheme, Constants.LAKEFS_AUTH_TOKEN_TTL_KEY_SUFFIX, -1);
+ if (tokenTTL != -1) {
+ this.lakeFSTokenTTLSeconds = Optional.of(tokenTTL);
+ }
+
// set additional headers (non-canonical) to sign with each request to STS
// non-canonical headers are signed by the presigner and sent to STS for verification in the requests by lakeFS to exchange the token
Map additionalHeaders = FSConfiguration.getMap(conf, scheme, Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_ADDITIONAL_HEADERS);
@@ -94,12 +106,7 @@ private boolean needsNewToken() {
}
public GeneratePresignGetCallerIdentityResponse newPresignedRequest() throws Exception {
- GeneratePresignGetCallerIdentityRequest stsReq = new GeneratePresignGetCallerIdentityRequest(
- new URI(this.stsEndpoint),
- this.awsProvider.getCredentials(),
- this.stsAdditionalHeaders,
- this.stsExpirationInSeconds
- );
+ GeneratePresignGetCallerIdentityRequest stsReq = new GeneratePresignGetCallerIdentityRequest(new URI(this.stsEndpoint), this.awsProvider.getCredentials(), this.stsAdditionalHeaders, this.stsExpirationInSeconds);
return this.stsPresigner.presignRequest(stsReq);
}
@@ -107,40 +114,29 @@ public String newPresignedGetCallerIdentityToken() throws Exception {
GeneratePresignGetCallerIdentityResponse signedRequest = this.newPresignedRequest();
// generate token parameters object
- LakeFSExternalPrincipalIdentityRequest identityTokenParams = new LakeFSExternalPrincipalIdentityRequest(
- signedRequest.getHTTPMethod(),
- signedRequest.getHost(),
- signedRequest.getRegion(),
- signedRequest.getAction(),
- signedRequest.getDate(),
- signedRequest.getExpires(),
- signedRequest.getAccessKeyId(),
- signedRequest.getSignature(),
- Arrays.asList(signedRequest.getSignedHeadersParam().split(";")),
- signedRequest.getVersion(),
- signedRequest.getAlgorithm(),
- signedRequest.getSecurityToken()
- );
+ LakeFSExternalPrincipalIdentityRequest identityTokenParams = new LakeFSExternalPrincipalIdentityRequest(signedRequest.getHTTPMethod(), signedRequest.getHost(), signedRequest.getRegion(), signedRequest.getAction(), signedRequest.getDate(), signedRequest.getExpires(), signedRequest.getAccessKeyId(), signedRequest.getSignature(), Arrays.asList(signedRequest.getSignedHeadersParam().split(";")), signedRequest.getVersion(), signedRequest.getAlgorithm(), signedRequest.getSecurityToken());
// base64 encode
return Base64.encodeBase64String(identityTokenParams.toJSON().getBytes());
}
private void newToken() throws Exception {
+ // created identity token to exchange for lakeFS token
String identityToken = this.newPresignedGetCallerIdentityToken();
- /*
- TODO(isan)
- depends on missing functionality PR https://github.com/treeverse/lakeFS/pull/7578 being merged.
- before merging this code - implement the call to lakeFS.
- it will introduce the functionality in the generated client of actually doing the login.
- call lakeFS to exchange the token for a lakeFS token
- The flow will be:
- 1. use this.lakeFSApi Client with ExternalPrincipal API class (no auth required)
- 2. this.lakeFSAuthToken = call api.ExternalPrincipalLogin(identityToken, )
- */
- // dummy initiation
- this.lakeFSAuthToken = new AuthenticationToken();
- this.lakeFSAuthToken.setTokenExpiration(System.currentTimeMillis() + 60);
+
+ // build lakeFS login request
+ ExternalLoginInformation req = new ExternalLoginInformation();
+
+ // set lakeFS token expiration if provided by the configuration
+ this.lakeFSTokenTTLSeconds.ifPresent(req::setTokenExpirationDuration);
+
+ // set identity request
+ IdentityRequestRequestWrapper t = new IdentityRequestRequestWrapper(identityToken);
+ req.setIdentityRequest(t);
+
+ // call lakeFS to exchange the identity token for a lakeFS token
+ AuthApi auth = new AuthApi(this.lakeFSApi);
+ this.lakeFSAuthToken = auth.externalPrincipalLogin().externalLoginInformation(req).execute();
}
// refresh can be called to create a new token regardless if the current token is expired or not or does not exist.
diff --git a/clients/hadoopfs/src/main/java/io/lakefs/auth/IdentityRequestRequestWrapper.java b/clients/hadoopfs/src/main/java/io/lakefs/auth/IdentityRequestRequestWrapper.java
new file mode 100644
index 00000000000..4ac5b70a56d
--- /dev/null
+++ b/clients/hadoopfs/src/main/java/io/lakefs/auth/IdentityRequestRequestWrapper.java
@@ -0,0 +1,11 @@
+package io.lakefs.auth;
+
+import com.google.gson.annotations.SerializedName;
+
+public class IdentityRequestRequestWrapper {
+ @SerializedName("identity_token")
+ private String identityToken;
+ public IdentityRequestRequestWrapper(String identityToken) {
+ this.identityToken = identityToken;
+ }
+}
diff --git a/clients/hadoopfs/src/test/java/io/lakefs/auth/AWSLakeFSTokenProviderTest.java b/clients/hadoopfs/src/test/java/io/lakefs/auth/AWSLakeFSTokenProviderTest.java
index 9106c979c73..304cbca6430 100644
--- a/clients/hadoopfs/src/test/java/io/lakefs/auth/AWSLakeFSTokenProviderTest.java
+++ b/clients/hadoopfs/src/test/java/io/lakefs/auth/AWSLakeFSTokenProviderTest.java
@@ -1,13 +1,31 @@
package io.lakefs.auth;
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import io.lakefs.Constants;
import io.lakefs.FSConfiguration;
+import io.lakefs.clients.sdk.model.AuthenticationToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
+
+import org.junit.Rule;
import org.junit.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.junit.MockServerRule;
+import org.mockserver.matchers.Times;
+import org.mockserver.model.Cookie;
+import org.mockserver.model.HttpRequest;
+
+import static org.mockserver.model.HttpResponse.response;
+
public class AWSLakeFSTokenProviderTest {
+ @Rule
+ public MockServerRule mockServerRule = new MockServerRule(this);
+ protected MockServerClient mockServerClient;
+ protected final Gson gson = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();
@Test
public void testProviderIdentityTokenSerde() throws Exception {
@@ -36,4 +54,38 @@ public void testProviderIdentityTokenSerde() throws Exception {
Assert.assertEquals("AWS4-HMAC-SHA256", request.getAlgorithm());
Assert.assertEquals(FSConfiguration.get(conf, "lakefs", Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_SESSION_TOKEN_KEY_SUFFIX), request.getSecurityToken());
}
+
+ protected void mockExternalPrincipalLogin(Long tokenExpiration, String token, String sessionID) {
+ // lakeFSFS initialization requires a blockstore.
+ HttpRequest request = HttpRequest.request().withCookie(new Cookie("sessionId", sessionID));
+
+ mockServerClient
+ .when(
+ request.withMethod("POST").withPath("/auth/external/principal/login"),
+ Times.once())
+ .respond(
+ response().withStatusCode(200).withBody(new AuthenticationToken().token(token).tokenExpiration(tokenExpiration).toJson())
+ );
+ }
+
+ @Test
+ public void testProviderToken() throws Exception {
+ String sessionID = "testProviderToken";
+ String expectedToken = "lakefs-jwt-token";
+ Configuration conf = new Configuration(false);
+ conf.set("fs.lakefs." + Constants.LAKEFS_AUTH_PROVIDER_KEY_SUFFIX, TemporaryAWSCredentialsLakeFSTokenProvider.NAME);
+ conf.set("fs.lakefs." + Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_ACCESS_KEY_SUFFIX, "accessKeyId");
+ conf.set("fs.lakefs." + Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_SECRET_KEY_SUFFIX, "secretAccessKey");
+ conf.set("fs.lakefs." + Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_SESSION_TOKEN_KEY_SUFFIX, "sessionToken");
+ conf.setInt("fs.lakefs." + Constants.LAKEFS_AUTH_TOKEN_TTL_KEY_SUFFIX, 120);
+ conf.set("fs.lakefs." + Constants.TOKEN_AWS_STS_ENDPOINT, "https://sts.amazonaws.com");
+ conf.set("fs.lakefs.endpoint", String.format("http://localhost:%d/", mockServerClient.getPort()));
+ conf.set("fs.lakefs.session_id", sessionID);
+
+ LakeFSTokenProvider provider = LakeFSTokenProviderFactory.newLakeFSTokenProvider(Constants.DEFAULT_SCHEME, conf);
+ mockExternalPrincipalLogin(1000L, expectedToken, sessionID);
+
+ String lakeFSJWT = provider.getToken();
+ Assert.assertEquals(expectedToken, lakeFSJWT);
+ }
}