Skip to content

Commit e68fcd5

Browse files
authored
Merge pull request #273 from nastra/server-side-planning-updates
Server-side scan planning adjustments
2 parents 76bd0ee + 651c8a8 commit e68fcd5

File tree

2 files changed

+69
-144
lines changed

2 files changed

+69
-144
lines changed

core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iceberg.catalog.TableIdentifier;
3737
import org.apache.iceberg.io.CloseableIterable;
3838
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
39+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3940
import org.apache.iceberg.rest.requests.PlanTableScanRequest;
4041
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
4142
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
@@ -44,8 +45,12 @@
4445
import org.slf4j.LoggerFactory;
4546

4647
class RESTTableScan extends DataTableScan {
47-
4848
private static final Logger LOG = LoggerFactory.getLogger(RESTTableScan.class);
49+
private static final long MIN_SLEEP_MS = 1000; // Initial delay
50+
private static final long MAX_SLEEP_MS = 60 * 1000; // Max backoff delay (1 minute)
51+
private static final int MAX_ATTEMPTS = 10; // Max number of poll checks
52+
private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000; // Total maximum duration (5 minutes)
53+
private static final double SCALE_FACTOR = 2.0; // Exponential scale factor
4954

5055
private final RESTClient client;
5156
private final Map<String, String> headers;
@@ -55,15 +60,8 @@ class RESTTableScan extends DataTableScan {
5560
private final TableIdentifier tableIdentifier;
5661
private final Set<Endpoint> supportedEndpoints;
5762
private final ParserContext parserContext;
58-
5963
private String currentPlanId = null;
6064

61-
private static final long MIN_SLEEP_MS = 1000; // Initial delay
62-
private static final long MAX_SLEEP_MS = 60 * 1000; // Max backoff delay (1 minute)
63-
private static final int MAX_ATTEMPTS = 10; // Max number of poll checks
64-
private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000; // Total maximum duration (5 minutes)
65-
private static final double SCALE_FACTOR = 2.0; // Exponential scale factor
66-
6765
RESTTableScan(
6866
Table table,
6967
Schema schema,
@@ -120,26 +118,24 @@ public CloseableIterable<FileScanTask> planFiles() {
120118
.collect(Collectors.toList());
121119
}
122120

123-
PlanTableScanRequest.Builder planTableScanRequestBuilder =
121+
PlanTableScanRequest.Builder builder =
124122
PlanTableScanRequest.builder()
125123
.withSelect(selectedColumns)
126124
.withFilter(filter())
127125
.withCaseSensitive(isCaseSensitive())
128126
.withStatsFields(statsFields);
129127

130128
if (startSnapshotId != null && endSnapshotId != null) {
131-
planTableScanRequestBuilder
129+
builder
132130
.withStartSnapshotId(startSnapshotId)
133131
.withEndSnapshotId(endSnapshotId)
134132
.withUseSnapshotSchema(true);
135133
} else if (snapshotId != null) {
136134
boolean useSnapShotSchema = snapshotId != table.currentSnapshot().snapshotId();
137-
planTableScanRequestBuilder
138-
.withSnapshotId(snapshotId)
139-
.withUseSnapshotSchema(useSnapShotSchema);
135+
builder.withSnapshotId(snapshotId).withUseSnapshotSchema(useSnapShotSchema);
140136
}
141137

142-
return planTableScan(planTableScanRequestBuilder.build());
138+
return planTableScan(builder.build());
143139
}
144140

145141
private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planTableScanRequest) {
@@ -161,10 +157,17 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT
161157
case SUBMITTED:
162158
Endpoint.check(supportedEndpoints, Endpoint.V1_FETCH_TABLE_SCAN_PLAN);
163159
return fetchPlanningResult(response.planId());
164-
160+
case FAILED:
161+
throw new IllegalStateException(
162+
String.format(
163+
"Received status: %s for planId: %s", PlanStatus.FAILED, response.planId()));
164+
case CANCELLED:
165+
throw new IllegalStateException(
166+
String.format(
167+
"Received status: %s for planId: %s", PlanStatus.CANCELLED, response.planId()));
165168
default:
166-
handleNonSuccessfulTerminalState(planStatus, response.planId());
167-
throw new IllegalStateException("Unexpected code path reached in planTableScan");
169+
throw new IllegalStateException(
170+
String.format("Invalid planStatus: %s for planId: %s", planStatus, response.planId()));
168171
}
169172
}
170173

@@ -200,7 +203,7 @@ private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
200203
.build();
201204

202205
try {
203-
FetchPlanningResultResponse finalResponse =
206+
FetchPlanningResultResponse response =
204207
Failsafe.with(retryPolicy)
205208
.get(
206209
() ->
@@ -211,16 +214,13 @@ private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
211214
headers,
212215
ErrorHandlers.defaultErrorHandler(),
213216
parserContext));
217+
Preconditions.checkState(
218+
response.planStatus() == PlanStatus.COMPLETED,
219+
"Plan finished with unexpected status %s for planId: %s",
220+
response.planStatus(),
221+
planId);
214222

215-
PlanStatus finalStatus = finalResponse.planStatus();
216-
217-
if (finalStatus == PlanStatus.COMPLETED) {
218-
return scanTasksIterable(finalResponse.planTasks(), finalResponse.fileScanTasks());
219-
}
220-
221-
throw new IllegalStateException(
222-
String.format(
223-
"Plan finished with unexpected status %s for planId: %s", finalStatus, planId));
223+
return scanTasksIterable(response.planTasks(), response.fileScanTasks());
224224
} catch (FailsafeException e) {
225225
// FailsafeException is thrown when retries are exhausted (Max Attempts/Duration)
226226
// Cleanup is handled by the .onFailure() hook, so we just wrap and rethrow.
@@ -279,18 +279,4 @@ public boolean cancelPlan() {
279279
return false;
280280
}
281281
}
282-
283-
private void handleNonSuccessfulTerminalState(PlanStatus status, String planId) {
284-
switch (status) {
285-
case FAILED:
286-
throw new IllegalStateException(
287-
String.format("Received status: %s for planId: %s", PlanStatus.FAILED, planId));
288-
case CANCELLED:
289-
throw new IllegalStateException(
290-
String.format("Received status: %s for planId: %s", PlanStatus.CANCELLED, planId));
291-
default:
292-
throw new IllegalStateException(
293-
String.format("Invalid planStatus: %s for planId: %s", status, planId));
294-
}
295-
}
296282
}

0 commit comments

Comments
 (0)