Skip to content

Commit a4c62b3

Browse files
authored
More sensible BasePolarisCatalog retry behavior (#1046)
* More sensible BasePolarisCatalog retry behavior * Reuse AWS/GCP retryable property * Update default to 2
1 parent 4f7cf97 commit a4c62b3

File tree

7 files changed

+212
-82
lines changed

7 files changed

+212
-82
lines changed

polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,12 @@ public static <T> Builder<T> builder() {
264264
+ STORAGE_CREDENTIAL_DURATION_SECONDS.key)
265265
.defaultValue(30 * 60) // 30 minutes
266266
.build();
267+
268+
public static final PolarisConfiguration<Integer> MAX_METADATA_REFRESH_RETRIES =
269+
PolarisConfiguration.<Integer>builder()
270+
.key("MAX_METADATA_REFRESH_RETRIES")
271+
.description(
272+
"How many times to retry refreshing metadata when the previous error was retryable")
273+
.defaultValue(2)
274+
.build();
267275
}

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
import static org.mockito.Mockito.spy;
2727
import static org.mockito.Mockito.when;
2828

29+
import com.azure.core.exception.HttpResponseException;
30+
import com.google.cloud.storage.StorageException;
2931
import com.google.common.collect.ImmutableMap;
30-
import com.google.common.collect.Iterators;
3132
import io.quarkus.test.junit.QuarkusMock;
3233
import io.quarkus.test.junit.QuarkusTest;
3334
import io.quarkus.test.junit.QuarkusTestProfile;
@@ -39,12 +40,14 @@
3940
import java.time.Clock;
4041
import java.util.Arrays;
4142
import java.util.EnumMap;
42-
import java.util.Iterator;
43+
import java.util.HashMap;
4344
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Set;
4647
import java.util.UUID;
48+
import java.util.function.Function;
4749
import java.util.function.Supplier;
50+
import java.util.stream.Stream;
4851
import org.apache.commons.lang3.NotImplementedException;
4952
import org.apache.iceberg.BaseTable;
5053
import org.apache.iceberg.CatalogProperties;
@@ -104,6 +107,7 @@
104107
import org.apache.polaris.service.catalog.io.FileIOFactory;
105108
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
106109
import org.apache.polaris.service.config.RealmEntityManagerFactory;
110+
import org.apache.polaris.service.exception.FakeAzureHttpResponse;
107111
import org.apache.polaris.service.exception.IcebergExceptionMapper;
108112
import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
109113
import org.apache.polaris.service.task.TableCleanupTaskHandler;
@@ -120,7 +124,12 @@
120124
import org.junit.jupiter.api.BeforeEach;
121125
import org.junit.jupiter.api.Test;
122126
import org.junit.jupiter.api.TestInfo;
127+
import org.junit.jupiter.params.ParameterizedTest;
128+
import org.junit.jupiter.params.provider.Arguments;
129+
import org.junit.jupiter.params.provider.MethodSource;
123130
import org.mockito.Mockito;
131+
import software.amazon.awssdk.core.exception.NonRetryableException;
132+
import software.amazon.awssdk.core.exception.RetryableException;
124133
import software.amazon.awssdk.services.sts.StsClient;
125134
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
126135
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
@@ -1517,23 +1526,46 @@ private void createNonExistingNamespaces(Namespace namespace) {
15171526
}
15181527
}
15191528

1520-
@Test
1521-
public void testRetriableException() {
1522-
Iterator<String> accessDeniedHint =
1523-
Iterators.cycle(IcebergExceptionMapper.getAccessDeniedHints());
1524-
RuntimeException s3Exception = new RuntimeException(accessDeniedHint.next());
1525-
RuntimeException azureBlobStorageException = new RuntimeException(accessDeniedHint.next());
1526-
RuntimeException gcsException = new RuntimeException(accessDeniedHint.next());
1527-
RuntimeException otherException = new RuntimeException(new IOException("Connection reset"));
1528-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(s3Exception))
1529-
.isFalse();
1530-
Assertions.assertThat(
1531-
BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(azureBlobStorageException))
1532-
.isFalse();
1533-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(gcsException))
1534-
.isFalse();
1535-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(otherException))
1536-
.isTrue();
1529+
@ParameterizedTest
1530+
@MethodSource
1531+
public void testRetriableException(Exception exception, boolean shouldRetry) {
1532+
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(exception))
1533+
.isEqualTo(shouldRetry);
1534+
}
1535+
1536+
static Stream<Arguments> testRetriableException() {
1537+
Set<Integer> NON_RETRYABLE_CODES = Set.of(401, 403, 404);
1538+
Set<Integer> RETRYABLE_CODES = Set.of(408, 504);
1539+
1540+
// Create a map of HTTP code returned from a cloud provider to whether it should be retried
1541+
Map<Integer, Boolean> cloudCodeMappings = new HashMap<>();
1542+
NON_RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, false));
1543+
RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, true));
1544+
1545+
return Stream.of(
1546+
Stream.of(
1547+
Arguments.of(new RuntimeException(new IOException("Connection reset")), true),
1548+
Arguments.of(RetryableException.builder().build(), true),
1549+
Arguments.of(NonRetryableException.builder().build(), false)),
1550+
IcebergExceptionMapper.getAccessDeniedHints().stream()
1551+
.map(hint -> Arguments.of(new RuntimeException(hint), false)),
1552+
cloudCodeMappings.entrySet().stream()
1553+
.flatMap(
1554+
entry ->
1555+
Stream.of(
1556+
Arguments.of(
1557+
new HttpResponseException(
1558+
"", new FakeAzureHttpResponse(entry.getKey()), ""),
1559+
entry.getValue()),
1560+
Arguments.of(
1561+
new StorageException(entry.getKey(), ""), entry.getValue()))),
1562+
IcebergExceptionMapper.RETRYABLE_AZURE_HTTP_CODES.stream()
1563+
.map(
1564+
code ->
1565+
Arguments.of(
1566+
new HttpResponseException("", new FakeAzureHttpResponse(code), ""),
1567+
true)))
1568+
.flatMap(Function.identity());
15371569
}
15381570

15391571
@Test

service/common/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ dependencies {
121121
testFixturesImplementation("software.amazon.awssdk:sts")
122122
testFixturesImplementation("software.amazon.awssdk:iam-policy-builder")
123123
testFixturesImplementation("software.amazon.awssdk:s3")
124+
125+
testFixturesImplementation(platform(libs.azuresdk.bom))
126+
testFixturesImplementation("com.azure:azure-core")
127+
testFixturesImplementation("com.azure:azure-storage-blob")
128+
testFixturesImplementation("com.azure:azure-storage-file-datalake")
124129
}
125130

126131
tasks.named("javadoc") { dependsOn("jandex") }

service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.polaris.service.catalog;
2020

21+
import static org.apache.polaris.service.exception.IcebergExceptionMapper.isStorageProviderRetryableException;
22+
2123
import com.google.common.annotations.VisibleForTesting;
2224
import com.google.common.base.Joiner;
2325
import com.google.common.base.Objects;
@@ -104,13 +106,11 @@
104106
import org.apache.polaris.core.storage.StorageLocation;
105107
import org.apache.polaris.service.catalog.io.FileIOFactory;
106108
import org.apache.polaris.service.catalog.io.FileIOUtil;
107-
import org.apache.polaris.service.exception.IcebergExceptionMapper;
108109
import org.apache.polaris.service.task.TaskExecutor;
109110
import org.apache.polaris.service.types.NotificationRequest;
110111
import org.apache.polaris.service.types.NotificationType;
111112
import org.slf4j.Logger;
112113
import org.slf4j.LoggerFactory;
113-
import software.amazon.awssdk.core.exception.SdkException;
114114

115115
/** Defines the relationship between PolarisEntities and Iceberg's business logic. */
116116
public class BasePolarisCatalog extends BaseMetastoreViewCatalog
@@ -134,8 +134,6 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
134134
"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST";
135135
static final boolean INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST_DEFAULT = false;
136136

137-
private static final int MAX_RETRIES = 12;
138-
139137
public static final Predicate<Exception> SHOULD_RETRY_REFRESH_PREDICATE =
140138
ex -> {
141139
// Default arguments from BaseMetastoreTableOperation only stop retries on
@@ -146,7 +144,8 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
146144
&& !(ex instanceof AlreadyExistsException)
147145
&& !(ex instanceof ForbiddenException)
148146
&& !(ex instanceof UnprocessableEntityException)
149-
&& isStorageProviderRetryableException(ex);
147+
&& (isStorageProviderRetryableException(ex)
148+
|| isStorageProviderRetryableException(ExceptionUtils.getRootCause(ex)));
150149
};
151150

152151
private final PolarisEntityManager entityManager;
@@ -1258,7 +1257,7 @@ public void doRefresh() {
12581257
refreshFromMetadataLocation(
12591258
latestLocation,
12601259
SHOULD_RETRY_REFRESH_PREDICATE,
1261-
MAX_RETRIES,
1260+
getMaxMetadataRefreshRetries(),
12621261
metadataLocation -> {
12631262
String latestLocationDir =
12641263
latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -1483,7 +1482,7 @@ public void doRefresh() {
14831482
refreshFromMetadataLocation(
14841483
latestLocation,
14851484
SHOULD_RETRY_REFRESH_PREDICATE,
1486-
MAX_RETRIES,
1485+
getMaxMetadataRefreshRetries(),
14871486
metadataLocation -> {
14881487
String latestLocationDir =
14891488
latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -2097,37 +2096,11 @@ private Boolean getBooleanContextConfiguration(String configKey, boolean default
20972096
.getConfiguration(callContext.getPolarisCallContext(), configKey, defaultValue);
20982097
}
20992098

2100-
/**
2101-
* Check if the exception is retryable for the storage provider
2102-
*
2103-
* @param ex exception
2104-
* @return true if the exception is retryable
2105-
*/
2106-
private static boolean isStorageProviderRetryableException(Exception ex) {
2107-
// For S3/Azure, the exception is not wrapped, while for GCP the exception is wrapped as a
2108-
// RuntimeException
2109-
Throwable rootCause = ExceptionUtils.getRootCause(ex);
2110-
if (rootCause == null) {
2111-
// no root cause, let it retry
2112-
return true;
2113-
}
2114-
// only S3 SdkException has this retryable property
2115-
if (rootCause instanceof SdkException && ((SdkException) rootCause).retryable()) {
2116-
return true;
2117-
}
2118-
// add more cases here if needed
2119-
// AccessDenied is not retryable
2120-
return !isAccessDenied(rootCause.getMessage());
2121-
}
2122-
2123-
private static boolean isAccessDenied(String errorMsg) {
2124-
// Corresponding error messages for storage providers Aws/Azure/Gcp
2125-
boolean isAccessDenied =
2126-
errorMsg != null && IcebergExceptionMapper.containsAnyAccessDeniedHint(errorMsg);
2127-
if (isAccessDenied) {
2128-
LOGGER.debug("Access Denied or Forbidden error: {}", errorMsg);
2129-
return true;
2130-
}
2131-
return false;
2099+
private int getMaxMetadataRefreshRetries() {
2100+
return callContext
2101+
.getPolarisCallContext()
2102+
.getConfigurationStore()
2103+
.getConfiguration(
2104+
callContext.getPolarisCallContext(), PolarisConfiguration.MAX_METADATA_REFRESH_RETRIES);
21322105
}
21332106
}

service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.azure.core.exception.AzureException;
2222
import com.azure.core.exception.HttpResponseException;
23+
import com.google.cloud.BaseServiceException;
2324
import com.google.cloud.storage.StorageException;
2425
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.collect.ImmutableSet;
@@ -58,10 +59,23 @@
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
6061
import org.slf4j.event.Level;
62+
import software.amazon.awssdk.core.exception.SdkException;
6163
import software.amazon.awssdk.services.s3.model.S3Exception;
6264

6365
@Provider
6466
public class IcebergExceptionMapper implements ExceptionMapper<RuntimeException> {
67+
/** Signifies that we could not extract an HTTP code from a given cloud exception */
68+
public static final int UNKNOWN_CLOUD_HTTP_CODE = -1;
69+
70+
public static final Set<Integer> RETRYABLE_AZURE_HTTP_CODES =
71+
Set.of(
72+
Response.Status.REQUEST_TIMEOUT.getStatusCode(),
73+
Response.Status.TOO_MANY_REQUESTS.getStatusCode(),
74+
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
75+
Response.Status.SERVICE_UNAVAILABLE.getStatusCode(),
76+
Response.Status.GATEWAY_TIMEOUT.getStatusCode(),
77+
IcebergExceptionMapper.UNKNOWN_CLOUD_HTTP_CODE);
78+
6579
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergExceptionMapper.class);
6680

6781
// Case-insensitive parts of exception messages that a request to a cloud provider was denied due
@@ -115,6 +129,36 @@ public static boolean containsAnyAccessDeniedHint(String message) {
115129
return ACCESS_DENIED_HINTS.stream().anyMatch(messageLower::contains);
116130
}
117131

132+
/**
133+
* Check if the exception is retryable for the storage provider
134+
*
135+
* @param ex exception
136+
* @return true if the exception is retryable
137+
*/
138+
public static boolean isStorageProviderRetryableException(Throwable ex) {
139+
if (ex == null) {
140+
return false;
141+
}
142+
143+
if (ex.getMessage() != null && containsAnyAccessDeniedHint(ex.getMessage())) {
144+
return false;
145+
}
146+
147+
return switch (ex) {
148+
// GCS
149+
case BaseServiceException bse -> bse.isRetryable();
150+
151+
// S3
152+
case SdkException se -> se.retryable();
153+
154+
// Azure exceptions don't have a retryable property so we just check the HTTP code
155+
case HttpResponseException hre ->
156+
RETRYABLE_AZURE_HTTP_CODES.contains(
157+
IcebergExceptionMapper.extractHttpCodeFromCloudException(hre));
158+
default -> true;
159+
};
160+
}
161+
118162
@VisibleForTesting
119163
public static Collection<String> getAccessDeniedHints() {
120164
return ImmutableSet.copyOf(ACCESS_DENIED_HINTS);
@@ -158,18 +202,29 @@ static int mapExceptionToResponseCode(RuntimeException rex) {
158202
};
159203
}
160204

205+
/**
206+
* We typically call cloud providers over HTTP, so when there's an exception there's typically an
207+
* associated HTTP code. This extracts the HTTP code if possible.
208+
*
209+
* @param rex The cloud provider exception
210+
* @return UNKNOWN_CLOUD_HTTP_CODE if the exception is not a cloud exception that we know how to
211+
* extract the code from
212+
*/
213+
public static int extractHttpCodeFromCloudException(RuntimeException rex) {
214+
return switch (rex) {
215+
case S3Exception s3e -> s3e.statusCode();
216+
case HttpResponseException hre -> hre.getResponse().getStatusCode();
217+
case StorageException se -> se.getCode();
218+
default -> UNKNOWN_CLOUD_HTTP_CODE;
219+
};
220+
}
221+
161222
static int mapCloudExceptionToResponseCode(RuntimeException rex) {
162223
if (doesAnyThrowableContainAccessDeniedHint(rex)) {
163224
return Status.FORBIDDEN.getStatusCode();
164225
}
165226

166-
int httpCode =
167-
switch (rex) {
168-
case S3Exception s3e -> s3e.statusCode();
169-
case HttpResponseException hre -> hre.getResponse().getStatusCode();
170-
case StorageException se -> se.getCode();
171-
default -> -1;
172-
};
227+
int httpCode = extractHttpCodeFromCloudException(rex);
173228
Status httpStatus = Status.fromStatusCode(httpCode);
174229
Status.Family httpFamily = Status.Family.familyOf(httpCode);
175230

service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919
package org.apache.polaris.service.exception;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22-
import static org.mockito.Mockito.mock;
23-
import static org.mockito.Mockito.when;
2422

2523
import com.azure.core.exception.AzureException;
2624
import com.azure.core.exception.HttpResponseException;
27-
import com.azure.core.http.HttpResponse;
2825
import com.google.cloud.storage.StorageException;
2926
import jakarta.ws.rs.core.Response;
3027
import java.util.Map;
@@ -65,7 +62,8 @@ static Stream<Arguments> fileIOExceptionMapping() {
6562
entry ->
6663
Stream.of(
6764
Arguments.of(
68-
new HttpResponseException("", mockAzureResponse(entry.getKey()), ""),
65+
new HttpResponseException(
66+
"", new FakeAzureHttpResponse(entry.getKey()), ""),
6967
entry.getValue()),
7068
Arguments.of(
7169
S3Exception.builder().message("").statusCode(entry.getKey()).build(),
@@ -82,17 +80,4 @@ void fileIOExceptionMapping(RuntimeException ex, int statusCode) {
8280
assertThat(response.getEntity()).extracting("message").isEqualTo(ex.getMessage());
8381
}
8482
}
85-
86-
/**
87-
* Creates a mock of the Azure-specific HttpResponse object, as it's quite difficult to construct
88-
* a "real" one.
89-
*
90-
* @param statusCode
91-
* @return
92-
*/
93-
private static HttpResponse mockAzureResponse(int statusCode) {
94-
HttpResponse res = mock(HttpResponse.class);
95-
when(res.getStatusCode()).thenReturn(statusCode);
96-
return res;
97-
}
9883
}

0 commit comments

Comments
 (0)