Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark]: Adapting 'path' to Spark's 'location' in table props and supporting the customization of the table location when creating a table #3843

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
Expand All @@ -36,6 +37,7 @@
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {
Expand Down Expand Up @@ -117,6 +119,9 @@ protected void dropTableImpl(Identifier identifier) {

@Override
public void createTableImpl(Identifier identifier, Schema schema) {
checkArgument(
!schema.options().containsKey(CoreOptions.PATH.key()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just relax this check? For example, If it is the same path, we should allow it.

I am quite concerned that SHOW CREATE TABLE may not run properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing. I would like to share my thoughts on the prohibition of setting table location in FileSystemCatalog.

I think the SHOW CREATE TABLE command is usually used to create a test table with the same schema as the source table. Users typically use SHOW CREATE TABLE to get the DDL of a table and then create a test table with a different name for testing purposes. In this scenario, the location information in the DDL will inevitably mismatch the path assigned by the FileSystemCatalog for the new table, requiring users to modify the DDL in order to successfully create the table.

So even if the restrictions were relaxed, I believe it would still be somewhat confusing for users, as success would only be guaranteed when the specified location matches the one assigned by the catalog. If that's the case, why bother passing the location at all?

Therefore, instead of relaxing the checks here, I suggest we optimize the documentation by clearly stating this restriction in the documentation. We can declare the location management restrictions of FileSystemCatalog and the recommended usage of Spark DDL. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also our current approach to managing Iceberg tables in our platform. We host the location for all Iceberg tables on behalf of the users and strongly advise against specifying a location when creating tables. So far, this has been working well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think a relax check will be better. "create a test table with a different name for testing purposes", we can have very clear exception for this case.

"The FileSystemCatalog does not support specifying location when creating a table.");
uncheck(() -> schemaManager(identifier).createTable(schema));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogLockContext;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -235,6 +237,9 @@ protected void dropTableImpl(Identifier identifier) {

@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
checkArgument(
!schema.options().containsKey(CoreOptions.PATH.key()),
"The JdbcCatalog does not support specifying location when creating a table.");
try {
// create table file
getSchemaManager(identifier).createTable(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,4 @@ public Schema build() {
return new Schema(columns, partitionKeys, primaryKeys, options, comment);
}
}

public static Schema fromTableSchema(TableSchema tableSchema) {
return new Schema(
tableSchema.fields(),
tableSchema.partitionKeys(),
tableSchema.primaryKeys(),
tableSchema.options(),
tableSchema.comment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.paimon.flink.clone;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand All @@ -37,6 +43,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Pick the files to be cloned of a table based on the input record. The record type it produce is
Expand Down Expand Up @@ -77,7 +84,7 @@ public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) th
FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);
targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
targetCatalog.createTable(
targetIdentifier, Schema.fromTableSchema(sourceTable.schema()), true);
targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true);

List<CloneFileInfo> result =
toCloneFileInfos(
Expand All @@ -95,6 +102,18 @@ public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) th
}
}

private static Schema newSchemaFromTableSchema(TableSchema tableSchema) {
return new Schema(
ImmutableList.copyOf(tableSchema.fields()),
ImmutableList.copyOf(tableSchema.partitionKeys()),
ImmutableList.copyOf(tableSchema.primaryKeys()),
ImmutableMap.copyOf(
Iterables.filter(
tableSchema.options().entrySet(),
entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))),
tableSchema.comment());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, when creating a new schema for the clone table, all table options from the original table are directly passed into the new schema, including the table path of the source table. This is then ignored during table creation, which seems incorrect. Additionally, after prohibiting the passing of custom table paths in the FileSystemCatalog, the clone table's unit tests would fail. Therefore, I have made a fix here.

Moreover, the original fromTableSchema method, when constructing the new schema, directly used the partitionKeys and primaryKeys instances from the source schema, which is not safe. So, I have made a copy here.

I have also moved this method from the Schema class to here, as this is the only place where it is needed. If you feel it is sufficiently general, I can put it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

}

private List<CloneFileInfo> toCloneFileInfos(
List<Path> files,
Path sourceTableRoot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -493,7 +494,18 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
// if changes on Hive fails there is no harm to perform the same changes to files again
TableSchema tableSchema;
try {
tableSchema = schemaManager(identifier).createTable(schema, usingExternalTable());
Path tableRoot;
if (schema.options().containsKey(CoreOptions.PATH.key())) {
checkArgument(
Objects.equals(createTableType(), TableType.EXTERNAL),
"The HiveCatalog only supports specifying location when creating an external table");
tableRoot = new Path(schema.options().get(CoreOptions.PATH.key()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Location specification is only allowed when creating an external table.

} else {
tableRoot = getTableLocation(identifier);
}

tableSchema =
schemaManager(identifier, tableRoot).createTable(schema, usingExternalTable());
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
Expand Down Expand Up @@ -707,10 +719,7 @@ public String warehouse() {

private Table newHmsTable(Identifier identifier, Map<String, String> tableParameters) {
long currentTimeMillis = System.currentTimeMillis();
TableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()),
TableType.class);
TableType tableType = createTableType();
Table table =
new Table(
identifier.getTableName(),
Expand All @@ -735,6 +744,11 @@ private Table newHmsTable(Identifier identifier, Map<String, String> tableParame
return table;
}

private TableType createTableType() {
return OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()), TableType.class);
}

private void updateHmsTable(Table table, Identifier identifier, TableSchema schema) {
StorageDescriptor sd = table.getSd() != null ? table.getSd() : new StorageDescriptor();

Expand Down Expand Up @@ -792,7 +806,11 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche
}

// update location
locationHelper.specifyTableLocation(table, getTableLocation(identifier).toString());
String location =
schema.options().containsKey(CoreOptions.PATH.key())
? schema.options().get(CoreOptions.PATH.key())
: getTableLocation(identifier).toString();
locationHelper.specifyTableLocation(table, location);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If provided, use the table path provided by the user.

}

private void updateHmsTablePars(Table table, TableSchema schema) {
Expand All @@ -816,8 +834,11 @@ private FieldSchema convertToFieldSchema(DataField dataField) {
}

private SchemaManager schemaManager(Identifier identifier) {
return new SchemaManager(
fileIO, getTableLocation(identifier), identifier.getBranchNameOrDefault())
return schemaManager(identifier, getTableLocation(identifier));
}

private SchemaManager schemaManager(Identifier identifier, Path path) {
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault())
.withLock(lock(identifier));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.paimon.hive;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand All @@ -29,6 +31,7 @@
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.HadoopUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -37,6 +40,7 @@
import org.apache.thrift.TException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.lang.reflect.Field;
import java.util.Arrays;
Expand Down Expand Up @@ -268,4 +272,36 @@ public void testAlterHiveTableParameters() {
fail("Test failed due to exception: " + e.getMessage());
}
}

@Test
public void testCreateExternalTableWithLocation(@TempDir java.nio.file.Path tempDir)
throws Exception {
HiveConf hiveConf = new HiveConf();
String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID();
hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true");
hiveConf.set(CatalogOptions.TABLE_TYPE.key(), "external");
String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
HiveCatalog externalWarehouseCatalog =
new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse);

String externalTablePath = tempDir.toString();

Schema schema =
new Schema(
Lists.newArrayList(new DataField(0, "foo", DataTypes.INT())),
Collections.emptyList(),
Collections.emptyList(),
ImmutableMap.of("path", externalTablePath),
"");

Identifier identifier = Identifier.create("default", "my_table");
externalWarehouseCatalog.createTable(identifier, schema, true);

org.apache.paimon.table.Table table = externalWarehouseCatalog.getTable(identifier);
assertThat(table.options())
.extracting(CoreOptions.PATH.key())
.isEqualTo("file:" + externalTablePath);

externalWarehouseCatalog.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ private Schema toInitialSchema(
Map<String, String> normalizedProperties = mergeSQLConf(properties);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION);
normalizedProperties.put(CoreOptions.PATH.key(), path);
}

String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
List<String> primaryKeys =
pkAsString == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ case class SparkTable(table: Table)
properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",", table.primaryKeys))
}
properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME)
properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key()))
if (table.comment.isPresent) {
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
}
Expand Down
Loading
Loading