Skip to content

Commit

Permalink
Update Paimon core to 1.0-SNAPSHOT
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
JingsongLi committed Nov 26, 2024
1 parent 6925f02 commit 5144ad9
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.paimon.table.BucketMode.FIXED;
import static org.apache.paimon.table.BucketMode.UNAWARE;
import static org.apache.paimon.trino.TrinoColumnHandle.TRINO_ROW_ID_NAME;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -124,19 +122,18 @@ public Optional<ConnectorTableLayout> getInsertLayout(
FileStoreTable storeTable = (FileStoreTable) table;
BucketMode bucketMode = storeTable.bucketMode();
switch (bucketMode) {
case FIXED:
case HASH_FIXED:
try {
return Optional.of(
new ConnectorTableLayout(
new TrinoPartitioningHandle(
InstantiationUtil.serializeObject(storeTable.schema()),
FIXED),
InstantiationUtil.serializeObject(storeTable.schema())),
storeTable.schema().bucketKeys(),
false));
} catch (IOException e) {
throw new RuntimeException(e);
}
case UNAWARE:
case BUCKET_UNAWARE:
return Optional.empty();
default:
throw new IllegalArgumentException("Unknown table bucket mode: " + bucketMode);
Expand Down Expand Up @@ -230,7 +227,7 @@ public ColumnHandle getMergeRowIdColumnHandle(
}
FileStoreTable storeTable = (FileStoreTable) table;
BucketMode bucketMode = storeTable.bucketMode();
if (bucketMode != FIXED) {
if (bucketMode != BucketMode.HASH_FIXED) {
throw new IllegalArgumentException("Unsupported table bucket mode: " + bucketMode);
}
Set<String> pkSet = new HashSet<>(table.primaryKeys());
Expand All @@ -252,13 +249,13 @@ public Optional<ConnectorPartitioningHandle> getUpdateLayout(
}
FileStoreTable storeTable = (FileStoreTable) table;
BucketMode bucketMode = storeTable.bucketMode();
if (bucketMode != FIXED) {
if (bucketMode != BucketMode.HASH_FIXED) {
throw new IllegalArgumentException("Unsupported table bucket mode: " + bucketMode);
}
try {
return Optional.of(
new TrinoPartitioningHandle(
InstantiationUtil.serializeObject(storeTable.schema()), FIXED));
InstantiationUtil.serializeObject(storeTable.schema())));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -282,7 +279,12 @@ public void finishMerge(
@Override
public boolean schemaExists(ConnectorSession session, String schemaName) {
catalog.initSession(session);
return catalog.databaseExists(schemaName);
try {
catalog.getDatabase(schemaName);
return true;
} catch (Catalog.DatabaseNotExistException e) {
return false;
}
}

@Override
Expand Down Expand Up @@ -427,11 +429,14 @@ public TrinoTableHandle getTableHandle(
SchemaTableName tableName,
Map<String, String> dynamicOptions) {
catalog.initSession(session);
return catalog.tableExists(
Identifier.create(tableName.getSchemaName(), tableName.getTableName()))
? new TrinoTableHandle(
tableName.getSchemaName(), tableName.getTableName(), dynamicOptions)
: null;
try {
catalog.getTable(
Identifier.create(tableName.getSchemaName(), tableName.getTableName()));
return new TrinoTableHandle(
tableName.getSchemaName(), tableName.getTableName(), dynamicOptions);
} catch (Catalog.TableNotExistException e) {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.trino;

import org.apache.paimon.table.BucketMode;

import com.google.inject.Inject;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
Expand All @@ -46,11 +44,7 @@ public BucketFunction getBucketFunction(
// todo support dynamic bucket tables
TrinoPartitioningHandle trinoPartitioningHandle =
(TrinoPartitioningHandle) partitioningHandle;
if (trinoPartitioningHandle.getBucketMode() == BucketMode.FIXED) {
return new FixedBucketTableShuffleFunction(
partitionChannelTypes, trinoPartitioningHandle, workerCount);
}
throw new UnsupportedOperationException(
"Unsupported table bucket mode: " + trinoPartitioningHandle.getBucketMode());
return new FixedBucketTableShuffleFunction(
partitionChannelTypes, trinoPartitioningHandle, workerCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,11 @@ private static void validataBucketMode(Table table) {
BucketMode mode =
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
: BucketMode.FIXED;
: BucketMode.HASH_FIXED;
switch (mode) {
case FIXED:
case UNAWARE:
case HASH_FIXED:
case BUCKET_UNAWARE:
break;
case DYNAMIC:
case GLOBAL_DYNAMIC:
if (table.primaryKeys().isEmpty()) {
throw new IllegalArgumentException(
"Only primary-key table can support dynamic bucket.");
}
throw new IllegalArgumentException("Global dynamic bucket mode are not supported");
default:
throw new IllegalArgumentException("Unknown bucket mode: " + mode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private ConnectorPageSource createPageSource(
new Path(indexFile.path()),
((FileStoreTable) table).fileIO(),
rowType)) {
if (!fileIndexPredicate.testPredicate(paimonFilter.get())) {
if (!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) {
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.trino;

import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.InstantiationUtil;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -33,32 +32,21 @@
public class TrinoPartitioningHandle implements ConnectorPartitioningHandle {

private final byte[] schema;
private final BucketMode bucketMode;

@JsonCreator
public TrinoPartitioningHandle(
@JsonProperty("schema") byte[] schema,
@JsonProperty("bucketMode") BucketMode bucketMode) {
public TrinoPartitioningHandle(@JsonProperty("schema") byte[] schema) {
this.schema = schema;
this.bucketMode = bucketMode;
}

@JsonProperty
public byte[] getSchema() {
return schema;
}

@JsonProperty
public BucketMode getBucketMode() {
return bucketMode;
}

public TableSchema getOriginalSchema() {
try {
return InstantiationUtil.deserializeObject(this.schema, getClass().getClassLoader());
} catch (IOException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected ConnectorSplitSource getSplits(
.convert(tableHandle.getFilter())
.ifPresent(readBuilder::withFilter);
tableHandle.getLimit().ifPresent(limit -> readBuilder.withLimit((int) limit));
List<Split> splits = readBuilder.newScan().plan().splits();
List<Split> splits = readBuilder.dropStats().newScan().plan().splits();

long maxRowCount = splits.stream().mapToLong(Split::rowCount).max().orElse(0L);
double minimumSplitWeight = TrinoSessionProperties.getMinimumSplitWeight(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ private static boolean shouldSkip(String fieldName) {

private static boolean isEnum(String className) {
switch (className) {
case "FileFormatType":
case "StartupMode":
case "MergeEngine":
case "ChangelogProducer":
Expand All @@ -108,8 +107,6 @@ private static boolean isEnum(String className) {

private static Class<?> buildClass(String className) {
switch (className) {
case "FileFormatType":
return CoreOptions.FileFormatType.class;
case "MergeEngine":
return CoreOptions.MergeEngine.class;
case "ChangelogProducer":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@

package org.apache.paimon.trino.catalog;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.*;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand All @@ -43,7 +39,6 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Trino catalog, use it after set session. */
public class TrinoCatalog implements Catalog {
Expand Down Expand Up @@ -120,30 +115,20 @@ public FileIO fileIO() {
return current.fileIO();
}

@Override
public Optional<CatalogLockFactory> lockFactory() {
return current.lockFactory();
}

@Override
public List<String> listDatabases() {
return current.listDatabases();
}

@Override
public boolean databaseExists(String s) {
return current.databaseExists(s);
}

@Override
public void createDatabase(String s, boolean b, Map<String, String> map)
throws DatabaseAlreadyExistException {
current.createDatabase(s, b, map);
}

@Override
public Map<String, String> loadDatabaseProperties(String s) throws DatabaseNotExistException {
return current.loadDatabaseProperties(s);
public Database getDatabase(String name) throws DatabaseNotExistException {
return current.getDatabase(name);
}

@Override
Expand All @@ -157,6 +142,11 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return current.getTable(identifier);
}

@Override
public Path getTableLocation(Identifier identifier) {
return current.getTableLocation(identifier);
}

@Override
public List<String> listTables(String s) throws DatabaseNotExistException {
return current.listTables(s);
Expand Down Expand Up @@ -185,48 +175,45 @@ public void alterTable(Identifier identifier, List<SchemaChange> list, boolean i
current.alterTable(identifier, list, ignoreIfExists);
}

@Override
public void createPartition(Identifier identifier, Map<String, String> map)
throws TableNotExistException {
current.createPartition(identifier, map);
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException {
current.dropPartition(identifier, partitions);
}

@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
return current.listPartitions(identifier);
}

@Override
public void close() throws Exception {
if (current != null) {
current.close();
}
}

@Override
public Optional<CatalogLockContext> lockContext() {
return current.lockContext();
}

@Override
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
return current.metastoreClientFactory(identifier);
}

@Override
public void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
current.createDatabase(name, ignoreIfExists);
}

@Override
public boolean tableExists(Identifier identifier) {
return current.tableExists(identifier);
}

@Override
public void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
current.alterTable(identifier, change, ignoreIfNotExists);
}

@Override
public boolean caseSensitive() {
return current.caseSensitive();
public boolean allowUpperCase() {
return current.allowUpperCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.trino;

import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.InstantiationUtil;

import io.airlift.json.JsonCodec;
Expand All @@ -35,8 +34,7 @@ public class TestTrinoPartitioningHandle {
@Test
public void testTrinoPartitioningHandle() throws Exception {
byte[] schemaData = InstantiationUtil.serializeObject("test_schema");
TrinoPartitioningHandle expected =
new TrinoPartitioningHandle(schemaData, BucketMode.FIXED);
TrinoPartitioningHandle expected = new TrinoPartitioningHandle(schemaData);
testRoundTrip(expected);
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ under the License.
</scm>

<properties>
<paimon.version>0.8.0</paimon.version>
<paimon.version>1.0-SNAPSHOT</paimon.version>
<target.java.version>11</target.java.version>
<junit5.version>5.8.1</junit5.version>
<slf4j.version>2.0.7</slf4j.version>
Expand Down

0 comments on commit 5144ad9

Please sign in to comment.