Skip to content

Commit

Permalink
lakeFSFS: IAM Role Support Login call (#7659)
Browse files Browse the repository at this point in the history
  • Loading branch information
Isan-Rivkin authored Apr 14, 2024
1 parent 822b847 commit 8760e0b
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 46 deletions.
2 changes: 2 additions & 0 deletions clients/hadoopfs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## _Upcoming_

lakeFSFS: new Token Provider feature with IAM Role Support for lakeFS authentication (#7659 + #7604)

## 0.2.3

* Fix createDirectoryMarkerIfNotExists (#7510)
Expand Down
2 changes: 1 addition & 1 deletion clients/hadoopfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ To export to S3:
<dependency>
<groupId>io.lakefs</groupId>
<artifactId>sdk</artifactId>
<version>1.17.0</version>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
2 changes: 2 additions & 0 deletions clients/hadoopfs/src/main/java/io/lakefs/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public class Constants {
public static final String ACCESS_MODE_KEY_SUFFIX = "access.mode";
// io.lakefs.auth.TemporaryAWSCredentialsLakeFSTokenProvider, io.lakefs.auth.InstanceProfileAWSCredentialsLakeFSTokenProvider
public static final String LAKEFS_AUTH_PROVIDER_KEY_SUFFIX = "auth.provider";

// TODO(isan) document all configuration fields before merge.
public static final String LAKEFS_AUTH_TOKEN_TTL_KEY_SUFFIX = "token.duration_seconds";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_ACCESS_KEY_SUFFIX = "token.aws.access.key";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_SECRET_KEY_SUFFIX = "token.aws.secret.key";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_SESSION_TOKEN_KEY_SUFFIX = "token.aws.session.token";
Expand Down
22 changes: 10 additions & 12 deletions clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
import io.lakefs.clients.sdk.auth.HttpBasicAuth;
import io.lakefs.clients.sdk.auth.HttpBearerAuth;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static io.lakefs.auth.LakeFSTokenProviderFactory.newLakeFSTokenProvider;

/**
* Provides access to lakeFS API using client library.
* This class uses the configuration to initialize API client and instance per API interface we expose.
*/
public class LakeFSClient {
private static final Logger LOG = LoggerFactory.getLogger(LakeFSClient.class);
private static final String BASIC_AUTH = "basic_auth";
private static final String JWT_TOKEN_AUTH = "jwt_token";

LakeFSTokenProvider provider;
private final ObjectsApi objectsApi;
private final StagingApi stagingApi;
private final RepositoriesApi repositoriesApi;
Expand All @@ -29,7 +30,7 @@ public class LakeFSClient {
public LakeFSClient(String scheme, Configuration conf) throws IOException {
String authProvider = FSConfiguration.get(conf, scheme, Constants.LAKEFS_AUTH_PROVIDER_KEY_SUFFIX, LakeFSClient.BASIC_AUTH);
ApiClient apiClient;

LOG.info("Initiating lakeFS auth provider: {}", authProvider);
if (authProvider == BASIC_AUTH) {
String accessKey = FSConfiguration.get(conf, scheme, Constants.ACCESS_KEY_KEY_SUFFIX);
if (accessKey == null) {
Expand All @@ -45,14 +46,11 @@ public LakeFSClient(String scheme, Configuration conf) throws IOException {
basicAuth.setUsername(accessKey);
basicAuth.setPassword(secretKey);
} else {
// TODO(isan) depends on missing functionality PR https://github.com/treeverse/lakeFS/pull/7578 being merged.
// once merged, we can use the following code to get the token
throw new IOException("Unsupported auth provider: " + authProvider + ". Only basic_auth is supported at the moment.");
// LakeFSTokenProvider tokenProvider = newLakeFSTokenProvider(scheme, conf);
// String jwt = tokenProvider.getToken();
// apiClient = newApiClientNoAuth(scheme, conf);
// HttpBearerAuth tokenAuth = (HttpBearerAuth)apiClient.getAuthentication(JWT_TOKEN_AUTH);
// tokenAuth.setBearerToken(jwt);
this.provider = LakeFSTokenProviderFactory.newLakeFSTokenProvider(Constants.DEFAULT_SCHEME, conf);
String lakeFSToken = provider.getToken();
apiClient = newApiClientNoAuth(scheme, conf);
HttpBearerAuth tokenAuth = (HttpBearerAuth) apiClient.getAuthentication(JWT_TOKEN_AUTH);
tokenAuth.setBearerToken(lakeFSToken);
}

this.objectsApi = new ObjectsApi(apiClient);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package io.lakefs.auth;

import com.amazonaws.auth.AWSCredentialsProvider;
import io.lakefs.Constants;
import io.lakefs.FSConfiguration;
import io.lakefs.clients.sdk.ApiClient;
import io.lakefs.clients.sdk.AuthApi;
import io.lakefs.clients.sdk.model.ExternalLoginInformation;
import io.lakefs.clients.sdk.model.AuthenticationToken;
import org.apache.commons.codec.binary.Base64;

import java.io.IOException;

import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.apache.hadoop.conf.Configuration;

Expand All @@ -23,6 +28,7 @@ public class AWSLakeFSTokenProvider implements LakeFSTokenProvider {
String stsEndpoint;
Map<String, String> stsAdditionalHeaders;
int stsExpirationInSeconds;
Optional<Integer> lakeFSTokenTTLSeconds = Optional.empty();
ApiClient lakeFSApi;

AWSLakeFSTokenProvider() {
Expand Down Expand Up @@ -68,6 +74,12 @@ protected void initialize(AWSCredentialsProvider awsProvider, String scheme, Con
}
this.lakeFSApi.setBasePath(endpoint);

// optional timeout for lakeFS token
int tokenTTL = FSConfiguration.getInt(conf, scheme, Constants.LAKEFS_AUTH_TOKEN_TTL_KEY_SUFFIX, -1);
if (tokenTTL != -1) {
this.lakeFSTokenTTLSeconds = Optional.of(tokenTTL);
}

// set additional headers (non-canonical) to sign with each request to STS
// non-canonical headers are signed by the presigner and sent to STS for verification in the requests by lakeFS to exchange the token
Map<String, String> additionalHeaders = FSConfiguration.getMap(conf, scheme, Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_ADDITIONAL_HEADERS);
Expand All @@ -94,53 +106,37 @@ private boolean needsNewToken() {
}

public GeneratePresignGetCallerIdentityResponse newPresignedRequest() throws Exception {
GeneratePresignGetCallerIdentityRequest stsReq = new GeneratePresignGetCallerIdentityRequest(
new URI(this.stsEndpoint),
this.awsProvider.getCredentials(),
this.stsAdditionalHeaders,
this.stsExpirationInSeconds
);
GeneratePresignGetCallerIdentityRequest stsReq = new GeneratePresignGetCallerIdentityRequest(new URI(this.stsEndpoint), this.awsProvider.getCredentials(), this.stsAdditionalHeaders, this.stsExpirationInSeconds);
return this.stsPresigner.presignRequest(stsReq);
}

public String newPresignedGetCallerIdentityToken() throws Exception {
GeneratePresignGetCallerIdentityResponse signedRequest = this.newPresignedRequest();

// generate token parameters object
LakeFSExternalPrincipalIdentityRequest identityTokenParams = new LakeFSExternalPrincipalIdentityRequest(
signedRequest.getHTTPMethod(),
signedRequest.getHost(),
signedRequest.getRegion(),
signedRequest.getAction(),
signedRequest.getDate(),
signedRequest.getExpires(),
signedRequest.getAccessKeyId(),
signedRequest.getSignature(),
Arrays.asList(signedRequest.getSignedHeadersParam().split(";")),
signedRequest.getVersion(),
signedRequest.getAlgorithm(),
signedRequest.getSecurityToken()
);
LakeFSExternalPrincipalIdentityRequest identityTokenParams = new LakeFSExternalPrincipalIdentityRequest(signedRequest.getHTTPMethod(), signedRequest.getHost(), signedRequest.getRegion(), signedRequest.getAction(), signedRequest.getDate(), signedRequest.getExpires(), signedRequest.getAccessKeyId(), signedRequest.getSignature(), Arrays.asList(signedRequest.getSignedHeadersParam().split(";")), signedRequest.getVersion(), signedRequest.getAlgorithm(), signedRequest.getSecurityToken());

// base64 encode
return Base64.encodeBase64String(identityTokenParams.toJSON().getBytes());
}

private void newToken() throws Exception {
// created identity token to exchange for lakeFS token
String identityToken = this.newPresignedGetCallerIdentityToken();
/*
TODO(isan)
depends on missing functionality PR https://github.com/treeverse/lakeFS/pull/7578 being merged.
before merging this code - implement the call to lakeFS.
it will introduce the functionality in the generated client of actually doing the login.
call lakeFS to exchange the token for a lakeFS token
The flow will be:
1. use this.lakeFSApi Client with ExternalPrincipal API class (no auth required)
2. this.lakeFSAuthToken = call api.ExternalPrincipalLogin(identityToken, <lakeFS Token optional TTL>)
*/
// dummy initiation
this.lakeFSAuthToken = new AuthenticationToken();
this.lakeFSAuthToken.setTokenExpiration(System.currentTimeMillis() + 60);

// build lakeFS login request
ExternalLoginInformation req = new ExternalLoginInformation();

// set lakeFS token expiration if provided by the configuration
this.lakeFSTokenTTLSeconds.ifPresent(req::setTokenExpirationDuration);

// set identity request
IdentityRequestRequestWrapper t = new IdentityRequestRequestWrapper(identityToken);
req.setIdentityRequest(t);

// call lakeFS to exchange the identity token for a lakeFS token
AuthApi auth = new AuthApi(this.lakeFSApi);
this.lakeFSAuthToken = auth.externalPrincipalLogin().externalLoginInformation(req).execute();
}

// refresh can be called to create a new token regardless if the current token is expired or not or does not exist.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.lakefs.auth;

import com.google.gson.annotations.SerializedName;

public class IdentityRequestRequestWrapper {
@SerializedName("identity_token")
private String identityToken;
public IdentityRequestRequestWrapper(String identityToken) {
this.identityToken = identityToken;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
package io.lakefs.auth;

import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.lakefs.Constants;
import io.lakefs.FSConfiguration;
import io.lakefs.clients.sdk.model.AuthenticationToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;

import org.junit.Rule;
import org.junit.Test;
import org.mockserver.client.MockServerClient;
import org.mockserver.junit.MockServerRule;
import org.mockserver.matchers.Times;
import org.mockserver.model.Cookie;
import org.mockserver.model.HttpRequest;

import static org.mockserver.model.HttpResponse.response;


public class AWSLakeFSTokenProviderTest {
@Rule
public MockServerRule mockServerRule = new MockServerRule(this);
protected MockServerClient mockServerClient;
protected final Gson gson = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();

@Test
public void testProviderIdentityTokenSerde() throws Exception {
Expand Down Expand Up @@ -36,4 +54,38 @@ public void testProviderIdentityTokenSerde() throws Exception {
Assert.assertEquals("AWS4-HMAC-SHA256", request.getAlgorithm());
Assert.assertEquals(FSConfiguration.get(conf, "lakefs", Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_SESSION_TOKEN_KEY_SUFFIX), request.getSecurityToken());
}

protected void mockExternalPrincipalLogin(Long tokenExpiration, String token, String sessionID) {
// lakeFSFS initialization requires a blockstore.
HttpRequest request = HttpRequest.request().withCookie(new Cookie("sessionId", sessionID));

mockServerClient
.when(
request.withMethod("POST").withPath("/auth/external/principal/login"),
Times.once())
.respond(
response().withStatusCode(200).withBody(new AuthenticationToken().token(token).tokenExpiration(tokenExpiration).toJson())
);
}

@Test
public void testProviderToken() throws Exception {
String sessionID = "testProviderToken";
String expectedToken = "lakefs-jwt-token";
Configuration conf = new Configuration(false);
conf.set("fs.lakefs." + Constants.LAKEFS_AUTH_PROVIDER_KEY_SUFFIX, TemporaryAWSCredentialsLakeFSTokenProvider.NAME);
conf.set("fs.lakefs." + Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_ACCESS_KEY_SUFFIX, "accessKeyId");
conf.set("fs.lakefs." + Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_SECRET_KEY_SUFFIX, "secretAccessKey");
conf.set("fs.lakefs." + Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_SESSION_TOKEN_KEY_SUFFIX, "sessionToken");
conf.setInt("fs.lakefs." + Constants.LAKEFS_AUTH_TOKEN_TTL_KEY_SUFFIX, 120);
conf.set("fs.lakefs." + Constants.TOKEN_AWS_STS_ENDPOINT, "https://sts.amazonaws.com");
conf.set("fs.lakefs.endpoint", String.format("http://localhost:%d/", mockServerClient.getPort()));
conf.set("fs.lakefs.session_id", sessionID);

LakeFSTokenProvider provider = LakeFSTokenProviderFactory.newLakeFSTokenProvider(Constants.DEFAULT_SCHEME, conf);
mockExternalPrincipalLogin(1000L, expectedToken, sessionID);

String lakeFSJWT = provider.getToken();
Assert.assertEquals(expectedToken, lakeFSJWT);
}
}

0 comments on commit 8760e0b

Please sign in to comment.