Skip to content

Commit 475d10a

Browse files
authored
Spark: Add rest table operations (drop, list, purge and rename etc) for Spark Client (apache#1368)
1 parent b3edd87 commit 475d10a

File tree

7 files changed

+432
-62
lines changed

7 files changed

+432
-62
lines changed

plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
package org.apache.polaris.spark;
2020

2121
import com.google.common.base.Preconditions;
22+
import com.google.common.collect.ImmutableList;
2223
import com.google.common.collect.ImmutableMap;
2324
import com.google.common.collect.ImmutableSet;
25+
import com.google.common.collect.Maps;
2426
import java.io.Closeable;
2527
import java.io.IOException;
2628
import java.io.UncheckedIOException;
@@ -31,6 +33,7 @@
3133
import org.apache.iceberg.CatalogProperties;
3234
import org.apache.iceberg.catalog.Namespace;
3335
import org.apache.iceberg.catalog.TableIdentifier;
36+
import org.apache.iceberg.exceptions.NoSuchTableException;
3437
import org.apache.iceberg.io.CloseableGroup;
3538
import org.apache.iceberg.rest.Endpoint;
3639
import org.apache.iceberg.rest.ErrorHandlers;
@@ -39,7 +42,9 @@
3942
import org.apache.iceberg.rest.ResourcePaths;
4043
import org.apache.iceberg.rest.auth.OAuth2Util;
4144
import org.apache.iceberg.rest.responses.ConfigResponse;
45+
import org.apache.iceberg.rest.responses.ListTablesResponse;
4246
import org.apache.iceberg.util.EnvironmentUtil;
47+
import org.apache.iceberg.util.PropertyUtil;
4348
import org.apache.polaris.core.rest.PolarisEndpoints;
4449
import org.apache.polaris.core.rest.PolarisResourcePaths;
4550
import org.apache.polaris.service.types.GenericTable;
@@ -52,13 +57,16 @@
5257
* objects.
5358
*/
5459
public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
60+
public static final String REST_PAGE_SIZE = "rest-page-size";
61+
5562
private final Function<Map<String, String>, RESTClient> clientBuilder;
5663

5764
private RESTClient restClient = null;
5865
private CloseableGroup closeables = null;
5966
private Set<Endpoint> endpoints;
6067
private OAuth2Util.AuthSession catalogAuth = null;
6168
private PolarisResourcePaths pathGenerator = null;
69+
private Integer pageSize = null;
6270

6371
// the default endpoints to config if server doesn't specify the 'endpoints' configuration.
6472
private static final Set<Endpoint> DEFAULT_ENDPOINTS = PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
@@ -101,6 +109,12 @@ public void initialize(Map<String, String> unresolved, OAuth2Util.AuthSession ca
101109
this.pathGenerator = PolarisResourcePaths.forCatalogProperties(mergedProps);
102110
this.restClient = clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
103111

112+
this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, REST_PAGE_SIZE);
113+
if (pageSize != null) {
114+
Preconditions.checkArgument(
115+
pageSize > 0, "Invalid value for %s, must be a positive integer", REST_PAGE_SIZE);
116+
}
117+
104118
this.closeables = new CloseableGroup();
105119
this.closeables.addCloseable(this.restClient);
106120
this.closeables.setSuppressCloseFailure(true);
@@ -138,12 +152,49 @@ public void close() throws IOException {
138152

139153
@Override
140154
public List<TableIdentifier> listGenericTables(Namespace ns) {
141-
throw new UnsupportedOperationException("listTables not supported");
155+
Endpoint.check(endpoints, PolarisEndpoints.V1_LIST_GENERIC_TABLES);
156+
157+
Map<String, String> queryParams = Maps.newHashMap();
158+
ImmutableList.Builder<TableIdentifier> tables = ImmutableList.builder();
159+
String pageToken = "";
160+
if (pageSize != null) {
161+
queryParams.put("pageSize", String.valueOf(pageSize));
162+
}
163+
164+
do {
165+
queryParams.put("pageToken", pageToken);
166+
ListTablesResponse response =
167+
restClient
168+
.withAuthSession(this.catalogAuth)
169+
.get(
170+
pathGenerator.genericTables(ns),
171+
queryParams,
172+
ListTablesResponse.class,
173+
Map.of(),
174+
ErrorHandlers.namespaceErrorHandler());
175+
pageToken = response.nextPageToken();
176+
tables.addAll(response.identifiers());
177+
} while (pageToken != null);
178+
179+
return tables.build();
142180
}
143181

144182
@Override
145183
public boolean dropGenericTable(TableIdentifier identifier) {
146-
throw new UnsupportedOperationException("dropTable not supported");
184+
Endpoint.check(endpoints, PolarisEndpoints.V1_DELETE_GENERIC_TABLE);
185+
186+
try {
187+
restClient
188+
.withAuthSession(this.catalogAuth)
189+
.delete(
190+
pathGenerator.genericTable(identifier),
191+
null,
192+
Map.of(),
193+
ErrorHandlers.tableErrorHandler());
194+
return true;
195+
} catch (NoSuchTableException e) {
196+
return false;
197+
}
147198
}
148199

149200
@Override

plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.polaris.spark;
2020

2121
import java.util.Map;
22+
import org.apache.iceberg.catalog.Namespace;
2223
import org.apache.iceberg.exceptions.AlreadyExistsException;
2324
import org.apache.iceberg.spark.Spark3Util;
2425
import org.apache.polaris.service.types.GenericTable;
@@ -90,12 +91,19 @@ public Table createTable(
9091
@Override
9192
public Table alterTable(Identifier identifier, TableChange... changes)
9293
throws NoSuchTableException {
93-
throw new NoSuchTableException(identifier);
94+
// alterTable currently is not supported for generic tables
95+
throw new UnsupportedOperationException("alterTable operation is not supported");
96+
}
97+
98+
@Override
99+
public boolean purgeTable(Identifier ident) {
100+
// purgeTable for generic table will only do a drop without purge
101+
return dropTable(ident);
94102
}
95103

96104
@Override
97105
public boolean dropTable(Identifier identifier) {
98-
return false;
106+
return this.polarisCatalog.dropGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
99107
}
100108

101109
@Override
@@ -106,6 +114,12 @@ public void renameTable(Identifier from, Identifier to)
106114

107115
@Override
108116
public Identifier[] listTables(String[] namespace) {
109-
throw new UnsupportedOperationException("listTables operation is not supported");
117+
try {
118+
return this.polarisCatalog.listGenericTables(Namespace.of(namespace)).stream()
119+
.map(ident -> Identifier.of(ident.namespace().levels(), ident.name()))
120+
.toArray(Identifier[]::new);
121+
} catch (UnsupportedOperationException ex) {
122+
return new Identifier[0];
123+
}
110124
}
111125
}

plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import com.google.common.base.Preconditions;
2222
import com.google.common.collect.Maps;
23+
import java.util.Arrays;
2324
import java.util.Map;
25+
import java.util.stream.Stream;
2426
import org.apache.arrow.util.VisibleForTesting;
2527
import org.apache.iceberg.CatalogProperties;
2628
import org.apache.iceberg.CatalogUtil;
@@ -161,33 +163,59 @@ public Table createTable(
161163

162164
@Override
163165
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
164-
throw new UnsupportedOperationException("alterTable");
166+
try {
167+
return this.icebergsSparkCatalog.alterTable(ident, changes);
168+
} catch (NoSuchTableException e) {
169+
Table table = this.polarisSparkCatalog.loadTable(ident);
170+
String provider = table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
171+
if (PolarisCatalogUtils.useDelta(provider)) {
172+
// For delta table, most of the alter operations is a delta log manipulation,
173+
// we load the delta catalog to help handling the alter table operation.
174+
// NOTE: This currently doesn't work for changing file location and file format
175+
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT.
176+
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
177+
return deltaCatalog.alterTable(ident, changes);
178+
}
179+
return this.polarisSparkCatalog.alterTable(ident);
180+
}
165181
}
166182

167183
@Override
168184
public boolean dropTable(Identifier ident) {
169-
throw new UnsupportedOperationException("dropTable");
185+
return this.icebergsSparkCatalog.dropTable(ident) || this.polarisSparkCatalog.dropTable(ident);
170186
}
171187

172188
@Override
173189
public void renameTable(Identifier from, Identifier to)
174190
throws NoSuchTableException, TableAlreadyExistsException {
175-
throw new UnsupportedOperationException("renameTable");
191+
try {
192+
this.icebergsSparkCatalog.renameTable(from, to);
193+
} catch (NoSuchTableException e) {
194+
this.polarisSparkCatalog.renameTable(from, to);
195+
}
176196
}
177197

178198
@Override
179199
public void invalidateTable(Identifier ident) {
180-
throw new UnsupportedOperationException("invalidateTable");
200+
this.icebergsSparkCatalog.invalidateTable(ident);
181201
}
182202

183203
@Override
184204
public boolean purgeTable(Identifier ident) {
185-
throw new UnsupportedOperationException("purgeTable");
205+
if (this.icebergsSparkCatalog.purgeTable(ident)) {
206+
return true;
207+
} else {
208+
return this.polarisSparkCatalog.purgeTable(ident);
209+
}
186210
}
187211

188212
@Override
189213
public Identifier[] listTables(String[] namespace) {
190-
throw new UnsupportedOperationException("listTables");
214+
Identifier[] icebergIdents = this.icebergsSparkCatalog.listTables(namespace);
215+
Identifier[] genericTableIdents = this.polarisSparkCatalog.listTables(namespace);
216+
217+
return Stream.concat(Arrays.stream(icebergIdents), Arrays.stream(genericTableIdents))
218+
.toArray(Identifier[]::new);
191219
}
192220

193221
@Override
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.spark.rest;
20+
21+
import com.fasterxml.jackson.annotation.JsonCreator;
22+
import com.fasterxml.jackson.annotation.JsonProperty;
23+
import java.util.Set;
24+
import org.apache.iceberg.catalog.TableIdentifier;
25+
import org.apache.iceberg.rest.RESTResponse;
26+
import org.apache.polaris.service.types.ListGenericTablesResponse;
27+
28+
/**
29+
* RESTResponse definition for ListGenericTable which extends the iceberg RESTResponse. This is
30+
* currently required because the Iceberg HTTPClient requires the request and response to be a class
31+
* of RESTRequest and RESTResponse.
32+
*/
33+
public class ListGenericTablesRESTResponse extends ListGenericTablesResponse
34+
implements RESTResponse {
35+
36+
@JsonCreator
37+
public ListGenericTablesRESTResponse(
38+
@JsonProperty(value = "next-page-token") String nextPageToken,
39+
@JsonProperty(value = "identifiers") Set<TableIdentifier> identifiers) {
40+
super(nextPageToken, identifiers);
41+
}
42+
43+
@Override
44+
public void validate() {}
45+
}

plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
*/
1919
package org.apache.polaris.spark;
2020

21+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
2122
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
23+
import org.apache.spark.sql.connector.catalog.Identifier;
24+
import org.apache.spark.sql.connector.catalog.Table;
25+
import org.apache.spark.sql.connector.catalog.TableChange;
2226

2327
/**
2428
* This is a fake delta catalog class that is used for testing. This class is a noop class that
@@ -29,4 +33,9 @@ public class NoopDeltaCatalog extends DelegatingCatalogExtension {
2933
// This is a mock of isUnityCatalog scala val in
3034
// org.apache.spark.sql.delta.catalog.DeltaCatalog.
3135
private boolean isUnityCatalog = false;
36+
37+
@Override
38+
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
39+
return super.loadTable(ident);
40+
}
3241
}

0 commit comments

Comments
 (0)