Skip to content

Commit

Permalink
Support createTable as foreign Table (apache#5)
Browse files Browse the repository at this point in the history

Co-authored-by: Yun Zou <yun.zou@snowflake.com>
  • Loading branch information
gh-yzou and sfc-gh-yzou authored Dec 11, 2024
1 parent 94ea50c commit c77c7b9
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class Builder
extends PolarisEntity.BaseBuilder<ForeignTableEntity, ForeignTableEntity.Builder> {
public Builder(TableIdentifier identifier, String metadataLocation) {
super();
setType(PolarisEntityType.FOREIGN_TABLE);
setType(PolarisEntityType.TABLE_LIKE);
setTableIdentifier(identifier);
setMetadataLocation(metadataLocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public enum PolarisEntitySubType {
NULL_SUBTYPE(0, null),
TABLE(2, PolarisEntityType.TABLE_LIKE),
VIEW(3, PolarisEntityType.TABLE_LIKE),
FOREIGN_TABLE(4, PolarisEntityType.FOREIGN_TABLE);
FOREIGN_TABLE(4, PolarisEntityType.TABLE_LIKE);

// to efficiently map the code of a subtype to its corresponding subtype enum, use a reverse
// array which is initialized below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public enum PolarisEntityType {
// generic table is either a view or a real table
TABLE_LIKE(7, NAMESPACE, false, false),
TASK(8, ROOT, false, false),
FILE(9, TABLE_LIKE, false, false),
FOREIGN_TABLE(10, NAMESPACE, false, false);
FILE(9, TABLE_LIKE, false, false);
// FOREIGN_TABLE(10, NAMESPACE, false, false);

// to efficiently map a code to its corresponding entity type, use a reverse array which
// is initialized below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisTaskConstants;
import org.apache.polaris.core.entity.TableLikeEntity;
import org.apache.polaris.core.entity.ForeignTableEntity;
import org.apache.polaris.core.persistence.BaseResult;
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
Expand Down Expand Up @@ -1211,13 +1212,25 @@ public void doRefresh() {
LOGGER.debug("doRefresh for tableIdentifier {}", tableIdentifier);
// While doing refresh/commit protocols, we must fetch the fresh "passthrough" resolved
// table entity instead of the statically-resolved authz resolution set.
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(
tableIdentifier, PolarisEntitySubType.TABLE);
boolean isForeignTable = false;
PolarisResolvedPathWrapper tryResolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, PolarisEntitySubType.TABLE);
if (tryResolvedEntities == null) {
tryResolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, PolarisEntitySubType.FOREIGN_TABLE);
if (tryResolvedEntities != null) {
isForeignTable = true;
}
}
PolarisResolvedPathWrapper resolvedEntities = tryResolvedEntities;
TableLikeEntity entity = null;

if (resolvedEntities != null) {
entity = TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
if (isForeignTable) {
entity = ForeignTableEntity.of(resolvedEntities.getRawLeafEntity());
} else {
entity = TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
}
if (!tableIdentifier.equals(entity.getTableIdentifier())) {
LOGGER
.atError()
Expand Down Expand Up @@ -1265,9 +1278,11 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
tableIdentifier, tableIdentifier.namespace());
}

boolean isForeignTable = !metadata.property(ForeignTableEntity.FOREIGN_SOURCE_KEY, "").isEmpty();
PolarisEntitySubType subType = isForeignTable ? PolarisEntitySubType.FOREIGN_TABLE : PolarisEntitySubType.TABLE;

PolarisResolvedPathWrapper resolvedTableEntities =
resolvedEntityView.getPassthroughResolvedPath(
tableIdentifier, PolarisEntitySubType.TABLE);
resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, subType);

// Fetch credentials for the resolved entity. The entity could be the table itself (if it has
// already been stored and credentials have been configured directly) or it could be the
Expand All @@ -1292,11 +1307,11 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
: resolvedTableEntities.getRawParentPath();
CatalogEntity catalog = CatalogEntity.of(resolvedNamespace.getFirst());

if (base == null
if (!isForeignTable && (base == null
|| !metadata.location().equals(base.location())
|| !Objects.equal(
base.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY),
metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY))) {
metadata.properties().get(TableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)))) {
// If location is changing then we must validate that the requested location is valid
// for the storage configuration inherited under this entity's path.
Set<String> dataLocations = new HashSet<>();
Expand Down Expand Up @@ -1342,20 +1357,34 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
// persistence-layer commit).
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(
tableIdentifier, PolarisEntitySubType.TABLE);
tableIdentifier, subType);
TableLikeEntity entity =
TableLikeEntity.of(resolvedEntities == null ? null : resolvedEntities.getRawLeafEntity());
String existingLocation;
if (null == entity) {
existingLocation = null;
entity =
new TableLikeEntity.Builder(tableIdentifier, newLocation)
.setCatalogId(getCatalogId())
.setSubType(PolarisEntitySubType.TABLE)
.setBaseLocation(metadata.location())
.setId(
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
if (isForeignTable) {
// create a foreign table entity
entity = new ForeignTableEntity.Builder(tableIdentifier, newLocation)
.setSource(metadata.properties().get(ForeignTableEntity.FOREIGN_SOURCE_KEY))
.setCatalogId(getCatalogId())
.setProperties(metadata.properties()) // table properties
.setSubType(subType)
.setBaseLocation(metadata.location()) // this is the delta table location
.setId(
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
} else{
entity =
new TableLikeEntity.Builder(tableIdentifier, newLocation)
.setCatalogId(getCatalogId())
.setSubType(subType)
.setBaseLocation(metadata.location())
.setId(
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
}

} else {
existingLocation = entity.getMetadataLocation();
entity =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ public ListTablesResponse listTables(Namespace namespace) {
return doCatalogOperation(() -> CatalogHandlers.listTables(baseCatalog, namespace));
}


public LoadTableResponse createTableDirect(Namespace namespace, CreateTableRequest request) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_DIRECT;
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,53 @@ public void testListGrantsAfterRename() {
}
}

@Test
public void testCreateForeignTable(PolarisToken adminToken) {
try (Response response =
EXT.client()
.target(
String.format(
"http://localhost:%d/api/management/v1/catalogs/%s",
EXT.getLocalPort(), currentCatalogName))
.request("application/json")
.header("Authorization", "Bearer " + adminToken.token())
.header(REALM_PROPERTY_KEY, realm)
.get()) {
assertThat(response).returns(Response.Status.OK.getStatusCode(), Response::getStatus);
Catalog catalog = response.readEntity(Catalog.class);
Map<String, String> catalogProps = new HashMap<>(catalog.getProperties().toMap());
try (Response updateResponse =
EXT.client()
.target(
String.format(
"http://localhost:%d/api/management/v1/catalogs/%s",
EXT.getLocalPort(), catalog.getName()))
.request("application/json")
.header("Authorization", "Bearer " + adminToken.token())
.header(REALM_PROPERTY_KEY, realm)
.put(
Entity.json(
new UpdateCatalogRequest(
catalog.getEntityVersion(),
catalogProps,
catalog.getStorageConfigInfo())))) {
assertThat(updateResponse).returns(Response.Status.OK.getStatusCode(), Response::getStatus);
}
}

restCatalog.createNamespace(Namespace.of("ns1"),
ImmutableMap.of(
PolarisEntityConstants.ENTITY_BASE_LOCATION,
catalogBaseLocation + "/checkpoint_dir/delta"));

TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("ns1"), "tbl1");
restCatalog
.buildTable(tableIdentifier, SCHEMA)
.withLocation(catalogBaseLocation + "/checkpoint_dir/delta/t1")
.withProperty("_source", "delta")
.create();
}

@Test
public void testCreateTableWithOverriddenBaseLocation(PolarisToken adminToken) {
try (Response response =
Expand Down

0 comments on commit c77c7b9

Please sign in to comment.