Skip to content

Commit 9f6eda6

Browse files
authored
Merge pull request #1 from snazy/opa-authorizer-file-refresh
Make token-refresh asynchronous ...
2 parents fe8e450 + 615cd9e commit 9f6eda6

File tree

6 files changed

+302
-130
lines changed

6 files changed

+302
-130
lines changed

extensions/auth/opa/impl/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ dependencies {
3131
implementation(libs.guava)
3232
implementation(libs.slf4j.api)
3333
implementation(libs.auth0.jwt)
34+
implementation(project(":polaris-async-api"))
3435

3536
// Iceberg dependency for ForbiddenException
3637
implementation(platform(libs.iceberg.bom))
@@ -47,4 +48,7 @@ dependencies {
4748
testImplementation(libs.assertj.core)
4849
testImplementation(libs.mockito.core)
4950
testImplementation(libs.threeten.extra)
51+
testImplementation(testFixtures(project(":polaris-async-api")))
52+
testImplementation(project(":polaris-async-java"))
53+
testImplementation(project(":polaris-idgen-mocks"))
5054
}

extensions/auth/opa/impl/src/main/java/org/apache/polaris/extension/auth/opa/OpaPolarisAuthorizerFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.polaris.extension.auth.opa.token.BearerTokenProvider;
3737
import org.apache.polaris.extension.auth.opa.token.FileBearerTokenProvider;
3838
import org.apache.polaris.extension.auth.opa.token.StaticBearerTokenProvider;
39+
import org.apache.polaris.nosql.async.AsyncExec;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
4142

@@ -48,14 +49,17 @@ class OpaPolarisAuthorizerFactory implements PolarisAuthorizerFactory {
4849

4950
private final OpaAuthorizationConfig opaConfig;
5051
private final Clock clock;
52+
private final ObjectMapper objectMapper;
53+
private final AsyncExec asyncExec;
5154
private CloseableHttpClient httpClient;
5255
private BearerTokenProvider bearerTokenProvider;
53-
private ObjectMapper objectMapper;
5456

5557
@Inject
56-
public OpaPolarisAuthorizerFactory(OpaAuthorizationConfig opaConfig, Clock clock) {
58+
public OpaPolarisAuthorizerFactory(
59+
OpaAuthorizationConfig opaConfig, Clock clock, AsyncExec asyncExec) {
5760
this.opaConfig = opaConfig;
5861
this.clock = clock;
62+
this.asyncExec = asyncExec;
5963
this.objectMapper = new ObjectMapper();
6064
}
6165

@@ -167,7 +171,13 @@ private BearerTokenProvider createBearerTokenProvider(
167171
Duration jwtExpirationBuffer = fileConfig.jwtExpirationBuffer().orElse(Duration.ofMinutes(1));
168172

169173
return new FileBearerTokenProvider(
170-
fileConfig.path(), refreshInterval, jwtExpirationRefresh, jwtExpirationBuffer, clock);
174+
fileConfig.path(),
175+
refreshInterval,
176+
jwtExpirationRefresh,
177+
jwtExpirationBuffer,
178+
Duration.ofSeconds(5),
179+
asyncExec,
180+
clock::instant);
171181
} else {
172182
throw new IllegalStateException(
173183
"No bearer token configuration found. Must specify either 'static-token' or 'file-based'");

extensions/auth/opa/impl/src/main/java/org/apache/polaris/extension/auth/opa/token/FileBearerTokenProvider.java

Lines changed: 75 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,26 @@
1818
*/
1919
package org.apache.polaris.extension.auth.opa.token;
2020

21+
import static com.google.common.base.Preconditions.checkState;
22+
2123
import com.auth0.jwt.JWT;
2224
import com.auth0.jwt.exceptions.JWTDecodeException;
2325
import com.auth0.jwt.interfaces.DecodedJWT;
24-
import com.google.common.base.Strings;
2526
import jakarta.annotation.Nullable;
2627
import java.io.IOException;
2728
import java.nio.charset.StandardCharsets;
2829
import java.nio.file.Files;
2930
import java.nio.file.Path;
30-
import java.time.Clock;
3131
import java.time.Duration;
3232
import java.time.Instant;
3333
import java.util.Date;
3434
import java.util.Optional;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.TimeUnit;
3537
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.function.Supplier;
39+
import org.apache.polaris.nosql.async.AsyncExec;
40+
import org.apache.polaris.nosql.async.Cancelable;
3641
import org.slf4j.Logger;
3742
import org.slf4j.LoggerFactory;
3843

@@ -58,12 +63,16 @@ public class FileBearerTokenProvider implements BearerTokenProvider {
5863
private final Duration refreshInterval;
5964
private final boolean jwtExpirationRefresh;
6065
private final Duration jwtExpirationBuffer;
61-
private final Clock clock;
66+
private final Supplier<Instant> clock;
6267
private final AtomicBoolean refreshLock = new AtomicBoolean();
68+
private final AsyncExec asyncExec;
69+
private final CompletableFuture<String> initialTokenFuture = new CompletableFuture<>();
70+
private final long initialTokenWaitMillis;
6371

6472
private volatile String cachedToken;
6573
private volatile Instant lastRefresh;
6674
private volatile Instant nextRefresh;
75+
private volatile Cancelable<?> refreshTask;
6776

6877
/**
6978
* Create a new file-based token provider with JWT expiration support.
@@ -80,24 +89,23 @@ public FileBearerTokenProvider(
8089
Duration refreshInterval,
8190
boolean jwtExpirationRefresh,
8291
Duration jwtExpirationBuffer,
83-
Clock clock) {
92+
Duration initialTokenWait,
93+
AsyncExec asyncExec,
94+
Supplier<Instant> clock) {
8495
this.tokenFilePath = tokenFilePath;
8596
this.refreshInterval = refreshInterval;
8697
this.jwtExpirationRefresh = jwtExpirationRefresh;
8798
this.jwtExpirationBuffer = jwtExpirationBuffer;
99+
this.initialTokenWaitMillis = initialTokenWait.toMillis();
88100
this.clock = clock;
101+
this.asyncExec = asyncExec;
89102

90-
// Load initial token eagerly to avoid race conditions during first getToken() calls
91-
this.cachedToken = loadTokenFromFile();
92-
if (Strings.isNullOrEmpty(this.cachedToken)) {
93-
throw new IllegalStateException(
94-
"Failed to load initial bearer token from file: "
95-
+ tokenFilePath
96-
+ ". This is required for OPA authorization.");
97-
}
103+
this.nextRefresh = Instant.MIN;
104+
this.lastRefresh = Instant.MIN;
105+
// start refreshing the token (immediately)
106+
scheduleRefreshAttempt(Duration.ZERO);
98107

99-
this.lastRefresh = clock.instant();
100-
this.nextRefresh = calculateNextRefresh(this.cachedToken);
108+
checkState(Files.isReadable(tokenFilePath), "OPA token file does not exist or is not readable");
101109

102110
logger.debug(
103111
"Created file token provider for path: {} with refresh interval: {}, JWT expiration refresh: {}, JWT buffer: {}, next refresh: {}",
@@ -110,56 +118,74 @@ public FileBearerTokenProvider(
110118

111119
@Override
112120
public String getToken() {
113-
// Check if we need to refresh
114-
if (shouldRefresh()) {
115-
refreshToken();
121+
String token = cachedToken;
122+
if (token != null) {
123+
// Regular case, we have a cached token
124+
return cachedToken;
116125
}
117-
118-
// Token is guaranteed to be present after construction, but check anyway for safety
119-
if (Strings.isNullOrEmpty(cachedToken)) {
120-
throw new RuntimeException(
121-
"Bearer token is unexpectedly empty. This should not happen after successful construction.");
126+
// We get here if the cached token is null, which means that the initial token
127+
// has not been loaded yet.
128+
// In this case we wait for the configured amount of time
129+
// (5 seconds in production, much lower in tests).
130+
try {
131+
return initialTokenFuture.get(initialTokenWaitMillis, TimeUnit.MILLISECONDS);
132+
} catch (Exception e) {
133+
throw new IllegalStateException("Failed to read initial OPA bearer token", e);
122134
}
123-
return cachedToken;
124135
}
125136

126137
@Override
127138
public void close() {
128139
cachedToken = null;
140+
Cancelable<?> task = refreshTask;
141+
if (task != null) {
142+
refreshTask.cancel();
143+
}
144+
}
145+
146+
private void refreshTokenAttempt() {
147+
boolean isInitialRefresh = cachedToken == null;
148+
Duration delay;
149+
if (doRefreshToken()) {
150+
delay = Duration.between(clock.get(), nextRefresh);
151+
if (isInitialRefresh) {
152+
// If we have never cached a token, complete the initial token-future to "unblock"
153+
// getToken() call sites waiting for it.
154+
initialTokenFuture.complete(cachedToken);
155+
}
156+
} else {
157+
// Token refresh did not succeed, retry soon
158+
delay = Duration.ofSeconds(1); // TODO configurable ?
159+
}
160+
scheduleRefreshAttempt(delay);
129161
}
130162

131-
private boolean shouldRefresh() {
132-
return clock.instant().isAfter(nextRefresh);
163+
private void scheduleRefreshAttempt(Duration delay) {
164+
this.refreshTask = asyncExec.schedule(this::refreshTokenAttempt, delay);
133165
}
134166

135-
private void refreshToken() {
136-
// Only one thread should refresh at a time. Other threads will use the cached token.
137-
if (!refreshLock.compareAndSet(false, true)) {
138-
return;
167+
private boolean doRefreshToken() {
168+
String newToken = loadTokenFromFile();
169+
170+
// Only update cached token if we successfully loaded a new one
171+
if (newToken == null) {
172+
logger.debug("Couldn't load new bearer token from {}, will retry.", tokenFilePath);
173+
return false;
139174
}
140-
try {
141-
String newToken = loadTokenFromFile();
175+
cachedToken = newToken;
142176

143-
// Only update cached token if we successfully loaded a new one
144-
if (newToken == null) {
145-
logger.debug("Couldn't load new bearer token from {}, will retry.", tokenFilePath);
146-
return;
147-
}
148-
cachedToken = newToken;
177+
lastRefresh = clock.get();
149178

150-
lastRefresh = clock.instant();
179+
// Calculate next refresh time based on current token (may be cached)
180+
nextRefresh = calculateNextRefresh(cachedToken);
151181

152-
// Calculate next refresh time based on current token (may be cached)
153-
nextRefresh = calculateNextRefresh(cachedToken);
182+
logger.debug(
183+
"Token refreshed from file: {} (token present: {}), next refresh: {}",
184+
tokenFilePath,
185+
cachedToken != null && !cachedToken.isEmpty(),
186+
nextRefresh);
154187

155-
logger.debug(
156-
"Token refreshed from file: {} (token present: {}), next refresh: {}",
157-
tokenFilePath,
158-
cachedToken != null && !cachedToken.isEmpty(),
159-
nextRefresh);
160-
} finally {
161-
refreshLock.set(false);
162-
}
188+
return true;
163189
}
164190

165191
/** Calculate when the next refresh should occur based on JWT expiration or fixed interval. */
@@ -176,7 +202,7 @@ private Instant calculateNextRefresh(String token) {
176202
Instant refreshTime = expiration.get().minus(jwtExpirationBuffer);
177203

178204
// Ensure refresh time is in the future and not too soon (at least 1 second)
179-
Instant minRefreshTime = clock.instant().plus(Duration.ofSeconds(1));
205+
Instant minRefreshTime = clock.get().plus(Duration.ofSeconds(1));
180206
if (refreshTime.isBefore(minRefreshTime)) {
181207
logger.warn(
182208
"JWT expires too soon ({}), using minimum refresh interval instead", expiration.get());

extensions/auth/opa/impl/src/test/java/org/apache/polaris/extension/auth/opa/OpaPolarisAuthorizerFactoryTest.java

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Optional;
3232
import org.apache.polaris.core.config.RealmConfig;
3333
import org.apache.polaris.extension.auth.opa.token.FileBearerTokenProvider;
34+
import org.apache.polaris.nosql.async.java.JavaPoolAsyncExec;
3435
import org.junit.jupiter.api.Test;
3536
import org.junit.jupiter.api.io.TempDir;
3637

@@ -63,14 +64,16 @@ public void testFactoryWithStaticTokenConfiguration() {
6364
when(opaConfig.auth()).thenReturn(authConfig);
6465
when(opaConfig.http()).thenReturn(httpConfig);
6566

66-
OpaPolarisAuthorizerFactory factory =
67-
new OpaPolarisAuthorizerFactory(opaConfig, Clock.systemUTC());
67+
try (JavaPoolAsyncExec asyncExec = new JavaPoolAsyncExec()) {
68+
OpaPolarisAuthorizerFactory factory =
69+
new OpaPolarisAuthorizerFactory(opaConfig, Clock.systemUTC(), asyncExec);
6870

69-
// Create authorizer
70-
RealmConfig realmConfig = mock(RealmConfig.class);
71-
OpaPolarisAuthorizer authorizer = (OpaPolarisAuthorizer) factory.create(realmConfig);
71+
// Create authorizer
72+
RealmConfig realmConfig = mock(RealmConfig.class);
73+
OpaPolarisAuthorizer authorizer = (OpaPolarisAuthorizer) factory.create(realmConfig);
7274

73-
assertThat(authorizer).isNotNull();
75+
assertThat(authorizer).isNotNull();
76+
}
7477
}
7578

7679
@Test
@@ -106,22 +109,30 @@ public void testFactoryWithFileBasedTokenConfiguration() throws IOException {
106109
when(opaConfig.auth()).thenReturn(authConfig);
107110
when(opaConfig.http()).thenReturn(httpConfig);
108111

109-
OpaPolarisAuthorizerFactory factory =
110-
new OpaPolarisAuthorizerFactory(opaConfig, Clock.systemUTC());
111-
112-
// Create authorizer
113-
RealmConfig realmConfig = mock(RealmConfig.class);
114-
OpaPolarisAuthorizer authorizer = (OpaPolarisAuthorizer) factory.create(realmConfig);
115-
116-
assertThat(authorizer).isNotNull();
117-
118-
// Also verify that the token provider actually reads from the file
119-
try (FileBearerTokenProvider provider =
120-
new FileBearerTokenProvider(
121-
tokenFile, Duration.ofMinutes(5), true, Duration.ofMinutes(1), Clock.systemUTC())) {
122-
123-
String actualToken = provider.getToken();
124-
assertThat(actualToken).isEqualTo(tokenValue);
112+
try (JavaPoolAsyncExec asyncExec = new JavaPoolAsyncExec()) {
113+
OpaPolarisAuthorizerFactory factory =
114+
new OpaPolarisAuthorizerFactory(opaConfig, Clock.systemUTC(), asyncExec);
115+
116+
// Create authorizer
117+
RealmConfig realmConfig = mock(RealmConfig.class);
118+
OpaPolarisAuthorizer authorizer = (OpaPolarisAuthorizer) factory.create(realmConfig);
119+
120+
assertThat(authorizer).isNotNull();
121+
122+
// Also verify that the token provider actually reads from the file
123+
try (FileBearerTokenProvider provider =
124+
new FileBearerTokenProvider(
125+
tokenFile,
126+
Duration.ofMinutes(5),
127+
true,
128+
Duration.ofMinutes(1),
129+
Duration.ofSeconds(10),
130+
asyncExec,
131+
Clock.systemUTC()::instant)) {
132+
133+
String actualToken = provider.getToken();
134+
assertThat(actualToken).isEqualTo(tokenValue);
135+
}
125136
}
126137
}
127138

@@ -141,14 +152,16 @@ public void testFactoryWithNoTokenConfiguration() {
141152
when(opaConfig.auth()).thenReturn(authConfig);
142153
when(opaConfig.http()).thenReturn(httpConfig);
143154

144-
OpaPolarisAuthorizerFactory factory =
145-
new OpaPolarisAuthorizerFactory(opaConfig, Clock.systemUTC());
155+
try (JavaPoolAsyncExec asyncExec = new JavaPoolAsyncExec()) {
156+
OpaPolarisAuthorizerFactory factory =
157+
new OpaPolarisAuthorizerFactory(opaConfig, Clock.systemUTC(), asyncExec);
146158

147-
// Create authorizer
148-
RealmConfig realmConfig = mock(RealmConfig.class);
149-
OpaPolarisAuthorizer authorizer = (OpaPolarisAuthorizer) factory.create(realmConfig);
159+
// Create authorizer
160+
RealmConfig realmConfig = mock(RealmConfig.class);
161+
OpaPolarisAuthorizer authorizer = (OpaPolarisAuthorizer) factory.create(realmConfig);
150162

151-
assertThat(authorizer).isNotNull();
163+
assertThat(authorizer).isNotNull();
164+
}
152165
}
153166

154167
private OpaAuthorizationConfig.HttpConfig createMockHttpConfig() {

0 commit comments

Comments
 (0)