Skip to content

Commit e550e3b

Browse files
Cherry-pick "More sensible BasePolarisCatalog retry behavior (apache#1046)" (apache#32)
* More sensible BasePolarisCatalog retry behavior (apache#1046) * More sensible BasePolarisCatalog retry behavior * Reuse AWS/GCP retryable property * Update default to 2 * Fix conflicts --------- Co-authored-by: Andrew Guterman <andrew.guterman1@gmail.com>
1 parent c1119ab commit e550e3b

File tree

6 files changed

+198
-84
lines changed

6 files changed

+198
-84
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
import static org.mockito.Mockito.spy;
2828
import static org.mockito.Mockito.when;
2929

30+
import com.azure.core.exception.HttpResponseException;
31+
import com.google.cloud.storage.StorageException;
3032
import com.google.common.collect.ImmutableMap;
31-
import com.google.common.collect.Iterators;
3233
import io.quarkus.test.junit.QuarkusMock;
3334
import io.quarkus.test.junit.QuarkusTest;
3435
import io.quarkus.test.junit.QuarkusTestProfile;
@@ -40,12 +41,14 @@
4041
import java.time.Clock;
4142
import java.util.Arrays;
4243
import java.util.EnumMap;
43-
import java.util.Iterator;
44+
import java.util.HashMap;
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Set;
4748
import java.util.UUID;
49+
import java.util.function.Function;
4850
import java.util.function.Supplier;
51+
import java.util.stream.Stream;
4952
import org.apache.commons.lang3.NotImplementedException;
5053
import org.apache.iceberg.BaseTable;
5154
import org.apache.iceberg.CatalogProperties;
@@ -108,6 +111,7 @@
108111
import org.apache.polaris.service.catalog.io.FileIOFactory;
109112
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
110113
import org.apache.polaris.service.config.RealmEntityManagerFactory;
114+
import org.apache.polaris.service.exception.FakeAzureHttpResponse;
111115
import org.apache.polaris.service.exception.IcebergExceptionMapper;
112116
import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
113117
import org.apache.polaris.service.task.TableCleanupTaskHandler;
@@ -124,7 +128,12 @@
124128
import org.junit.jupiter.api.BeforeEach;
125129
import org.junit.jupiter.api.Test;
126130
import org.junit.jupiter.api.TestInfo;
131+
import org.junit.jupiter.params.ParameterizedTest;
132+
import org.junit.jupiter.params.provider.Arguments;
133+
import org.junit.jupiter.params.provider.MethodSource;
127134
import org.mockito.Mockito;
135+
import software.amazon.awssdk.core.exception.NonRetryableException;
136+
import software.amazon.awssdk.core.exception.RetryableException;
128137
import software.amazon.awssdk.services.sts.StsClient;
129138
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
130139
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
@@ -1521,23 +1530,46 @@ private void createNonExistingNamespaces(Namespace namespace) {
15211530
}
15221531
}
15231532

1524-
@Test
1525-
public void testRetriableException() {
1526-
Iterator<String> accessDeniedHint =
1527-
Iterators.cycle(IcebergExceptionMapper.getAccessDeniedHints());
1528-
RuntimeException s3Exception = new RuntimeException(accessDeniedHint.next());
1529-
RuntimeException azureBlobStorageException = new RuntimeException(accessDeniedHint.next());
1530-
RuntimeException gcsException = new RuntimeException(accessDeniedHint.next());
1531-
RuntimeException otherException = new RuntimeException(new IOException("Connection reset"));
1532-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(s3Exception))
1533-
.isFalse();
1534-
Assertions.assertThat(
1535-
BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(azureBlobStorageException))
1536-
.isFalse();
1537-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(gcsException))
1538-
.isFalse();
1539-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(otherException))
1540-
.isTrue();
1533+
@ParameterizedTest
1534+
@MethodSource
1535+
public void testRetriableException(Exception exception, boolean shouldRetry) {
1536+
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(exception))
1537+
.isEqualTo(shouldRetry);
1538+
}
1539+
1540+
static Stream<Arguments> testRetriableException() {
1541+
Set<Integer> NON_RETRYABLE_CODES = Set.of(401, 403, 404);
1542+
Set<Integer> RETRYABLE_CODES = Set.of(408, 504);
1543+
1544+
// Create a map of HTTP code returned from a cloud provider to whether it should be retried
1545+
Map<Integer, Boolean> cloudCodeMappings = new HashMap<>();
1546+
NON_RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, false));
1547+
RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, true));
1548+
1549+
return Stream.of(
1550+
Stream.of(
1551+
Arguments.of(new RuntimeException(new IOException("Connection reset")), true),
1552+
Arguments.of(RetryableException.builder().build(), true),
1553+
Arguments.of(NonRetryableException.builder().build(), false)),
1554+
IcebergExceptionMapper.getAccessDeniedHints().stream()
1555+
.map(hint -> Arguments.of(new RuntimeException(hint), false)),
1556+
cloudCodeMappings.entrySet().stream()
1557+
.flatMap(
1558+
entry ->
1559+
Stream.of(
1560+
Arguments.of(
1561+
new HttpResponseException(
1562+
"", new FakeAzureHttpResponse(entry.getKey()), ""),
1563+
entry.getValue()),
1564+
Arguments.of(
1565+
new StorageException(entry.getKey(), ""), entry.getValue()))),
1566+
IcebergExceptionMapper.RETRYABLE_AZURE_HTTP_CODES.stream()
1567+
.map(
1568+
code ->
1569+
Arguments.of(
1570+
new HttpResponseException("", new FakeAzureHttpResponse(code), ""),
1571+
true)))
1572+
.flatMap(Function.identity());
15411573
}
15421574

15431575
@Test

service/common/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ dependencies {
121121
testFixturesImplementation("software.amazon.awssdk:sts")
122122
testFixturesImplementation("software.amazon.awssdk:iam-policy-builder")
123123
testFixturesImplementation("software.amazon.awssdk:s3")
124+
125+
testFixturesImplementation(platform(libs.azuresdk.bom))
126+
testFixturesImplementation("com.azure:azure-core")
127+
testFixturesImplementation("com.azure:azure-storage-blob")
128+
testFixturesImplementation("com.azure:azure-storage-file-datalake")
124129
}
125130

126131
tasks.named("javadoc") { dependsOn("jandex") }

service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.polaris.service.catalog;
2020

21+
import static org.apache.polaris.service.exception.IcebergExceptionMapper.isStorageProviderRetryableException;
22+
2123
import com.google.common.annotations.VisibleForTesting;
2224
import com.google.common.base.Joiner;
2325
import com.google.common.base.Objects;
@@ -107,13 +109,11 @@
107109
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
108110
import org.apache.polaris.service.catalog.io.FileIOFactory;
109111
import org.apache.polaris.service.catalog.io.FileIOUtil;
110-
import org.apache.polaris.service.exception.IcebergExceptionMapper;
111112
import org.apache.polaris.service.task.TaskExecutor;
112113
import org.apache.polaris.service.types.NotificationRequest;
113114
import org.apache.polaris.service.types.NotificationType;
114115
import org.slf4j.Logger;
115116
import org.slf4j.LoggerFactory;
116-
import software.amazon.awssdk.core.exception.SdkException;
117117

118118
/** Defines the relationship between PolarisEntities and Iceberg's business logic. */
119119
public class BasePolarisCatalog extends BaseMetastoreViewCatalog
@@ -137,8 +137,6 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
137137
"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST";
138138
static final boolean INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST_DEFAULT = false;
139139

140-
private static final int MAX_RETRIES = 12;
141-
142140
public static final Predicate<Exception> SHOULD_RETRY_REFRESH_PREDICATE =
143141
ex -> {
144142
// Default arguments from BaseMetastoreTableOperation only stop retries on
@@ -149,7 +147,8 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
149147
&& !(ex instanceof AlreadyExistsException)
150148
&& !(ex instanceof ForbiddenException)
151149
&& !(ex instanceof UnprocessableEntityException)
152-
&& isStorageProviderRetryableException(ex);
150+
&& (isStorageProviderRetryableException(ex)
151+
|| isStorageProviderRetryableException(ExceptionUtils.getRootCause(ex)));
153152
};
154153

155154
private final PolarisEntityManager entityManager;
@@ -1302,7 +1301,7 @@ public void doRefresh() {
13021301
refreshFromMetadataLocation(
13031302
latestLocation,
13041303
SHOULD_RETRY_REFRESH_PREDICATE,
1305-
MAX_RETRIES,
1304+
getMaxMetadataRefreshRetries(),
13061305
metadataLocation -> {
13071306
String latestLocationDir =
13081307
latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -1531,7 +1530,7 @@ public void doRefresh() {
15311530
refreshFromMetadataLocation(
15321531
latestLocation,
15331532
SHOULD_RETRY_REFRESH_PREDICATE,
1534-
MAX_RETRIES,
1533+
getMaxMetadataRefreshRetries(),
15351534
metadataLocation -> {
15361535
String latestLocationDir =
15371536
latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -2172,40 +2171,6 @@ private Boolean getBooleanContextConfiguration(String configKey, boolean default
21722171
.getConfiguration(callContext.getPolarisCallContext(), configKey, defaultValue);
21732172
}
21742173

2175-
/**
2176-
* Check if the exception is retryable for the storage provider
2177-
*
2178-
* @param ex exception
2179-
* @return true if the exception is retryable
2180-
*/
2181-
private static boolean isStorageProviderRetryableException(Exception ex) {
2182-
// For S3/Azure, the exception is not wrapped, while for GCP the exception is wrapped as a
2183-
// RuntimeException
2184-
Throwable rootCause = ExceptionUtils.getRootCause(ex);
2185-
if (rootCause == null) {
2186-
// no root cause, let it retry
2187-
return true;
2188-
}
2189-
// only S3 SdkException has this retryable property
2190-
if (rootCause instanceof SdkException && ((SdkException) rootCause).retryable()) {
2191-
return true;
2192-
}
2193-
// add more cases here if needed
2194-
// AccessDenied is not retryable
2195-
return !isAccessDenied(rootCause.getMessage());
2196-
}
2197-
2198-
private static boolean isAccessDenied(String errorMsg) {
2199-
// Corresponding error messages for storage providers Aws/Azure/Gcp
2200-
boolean isAccessDenied =
2201-
errorMsg != null && IcebergExceptionMapper.containsAnyAccessDeniedHint(errorMsg);
2202-
if (isAccessDenied) {
2203-
LOGGER.debug("Access Denied or Forbidden error: {}", errorMsg);
2204-
return true;
2205-
}
2206-
return false;
2207-
}
2208-
22092174
private int getMaxMetadataRefreshRetries() {
22102175
return callContext
22112176
.getPolarisCallContext()

service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.azure.core.exception.AzureException;
2222
import com.azure.core.exception.HttpResponseException;
23+
import com.google.cloud.BaseServiceException;
2324
import com.google.cloud.storage.StorageException;
2425
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.collect.ImmutableSet;
@@ -58,10 +59,23 @@
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
6061
import org.slf4j.event.Level;
62+
import software.amazon.awssdk.core.exception.SdkException;
6163
import software.amazon.awssdk.services.s3.model.S3Exception;
6264

6365
@Provider
6466
public class IcebergExceptionMapper implements ExceptionMapper<RuntimeException> {
67+
/** Signifies that we could not extract an HTTP code from a given cloud exception */
68+
public static final int UNKNOWN_CLOUD_HTTP_CODE = -1;
69+
70+
public static final Set<Integer> RETRYABLE_AZURE_HTTP_CODES =
71+
Set.of(
72+
Response.Status.REQUEST_TIMEOUT.getStatusCode(),
73+
Response.Status.TOO_MANY_REQUESTS.getStatusCode(),
74+
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
75+
Response.Status.SERVICE_UNAVAILABLE.getStatusCode(),
76+
Response.Status.GATEWAY_TIMEOUT.getStatusCode(),
77+
IcebergExceptionMapper.UNKNOWN_CLOUD_HTTP_CODE);
78+
6579
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergExceptionMapper.class);
6680

6781
// Case-insensitive parts of exception messages that a request to a cloud provider was denied due
@@ -115,6 +129,36 @@ public static boolean containsAnyAccessDeniedHint(String message) {
115129
return ACCESS_DENIED_HINTS.stream().anyMatch(messageLower::contains);
116130
}
117131

132+
/**
133+
* Check if the exception is retryable for the storage provider
134+
*
135+
* @param ex exception
136+
* @return true if the exception is retryable
137+
*/
138+
public static boolean isStorageProviderRetryableException(Throwable ex) {
139+
if (ex == null) {
140+
return false;
141+
}
142+
143+
if (ex.getMessage() != null && containsAnyAccessDeniedHint(ex.getMessage())) {
144+
return false;
145+
}
146+
147+
return switch (ex) {
148+
// GCS
149+
case BaseServiceException bse -> bse.isRetryable();
150+
151+
// S3
152+
case SdkException se -> se.retryable();
153+
154+
// Azure exceptions don't have a retryable property so we just check the HTTP code
155+
case HttpResponseException hre ->
156+
RETRYABLE_AZURE_HTTP_CODES.contains(
157+
IcebergExceptionMapper.extractHttpCodeFromCloudException(hre));
158+
default -> true;
159+
};
160+
}
161+
118162
@VisibleForTesting
119163
public static Collection<String> getAccessDeniedHints() {
120164
return ImmutableSet.copyOf(ACCESS_DENIED_HINTS);
@@ -158,18 +202,29 @@ static int mapExceptionToResponseCode(RuntimeException rex) {
158202
};
159203
}
160204

205+
/**
206+
* We typically call cloud providers over HTTP, so when there's an exception there's typically an
207+
* associated HTTP code. This extracts the HTTP code if possible.
208+
*
209+
* @param rex The cloud provider exception
210+
* @return UNKNOWN_CLOUD_HTTP_CODE if the exception is not a cloud exception that we know how to
211+
* extract the code from
212+
*/
213+
public static int extractHttpCodeFromCloudException(RuntimeException rex) {
214+
return switch (rex) {
215+
case S3Exception s3e -> s3e.statusCode();
216+
case HttpResponseException hre -> hre.getResponse().getStatusCode();
217+
case StorageException se -> se.getCode();
218+
default -> UNKNOWN_CLOUD_HTTP_CODE;
219+
};
220+
}
221+
161222
static int mapCloudExceptionToResponseCode(RuntimeException rex) {
162223
if (doesAnyThrowableContainAccessDeniedHint(rex)) {
163224
return Status.FORBIDDEN.getStatusCode();
164225
}
165226

166-
int httpCode =
167-
switch (rex) {
168-
case S3Exception s3e -> s3e.statusCode();
169-
case HttpResponseException hre -> hre.getResponse().getStatusCode();
170-
case StorageException se -> se.getCode();
171-
default -> -1;
172-
};
227+
int httpCode = extractHttpCodeFromCloudException(rex);
173228
Status httpStatus = Status.fromStatusCode(httpCode);
174229
Status.Family httpFamily = Status.Family.familyOf(httpCode);
175230

service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919
package org.apache.polaris.service.exception;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22-
import static org.mockito.Mockito.mock;
23-
import static org.mockito.Mockito.when;
2422

2523
import com.azure.core.exception.AzureException;
2624
import com.azure.core.exception.HttpResponseException;
27-
import com.azure.core.http.HttpResponse;
2825
import com.google.cloud.storage.StorageException;
2926
import jakarta.ws.rs.core.Response;
3027
import java.util.Map;
@@ -65,7 +62,8 @@ static Stream<Arguments> fileIOExceptionMapping() {
6562
entry ->
6663
Stream.of(
6764
Arguments.of(
68-
new HttpResponseException("", mockAzureResponse(entry.getKey()), ""),
65+
new HttpResponseException(
66+
"", new FakeAzureHttpResponse(entry.getKey()), ""),
6967
entry.getValue()),
7068
Arguments.of(
7169
S3Exception.builder().message("").statusCode(entry.getKey()).build(),
@@ -82,17 +80,4 @@ void fileIOExceptionMapping(RuntimeException ex, int statusCode) {
8280
assertThat(response.getEntity()).extracting("message").isEqualTo(ex.getMessage());
8381
}
8482
}
85-
86-
/**
87-
* Creates a mock of the Azure-specific HttpResponse object, as it's quite difficult to construct
88-
* a "real" one.
89-
*
90-
* @param statusCode
91-
* @return
92-
*/
93-
private static HttpResponse mockAzureResponse(int statusCode) {
94-
HttpResponse res = mock(HttpResponse.class);
95-
when(res.getStatusCode()).thenReturn(statusCode);
96-
return res;
97-
}
9883
}

0 commit comments

Comments
 (0)