Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2999][#2997] improvement(partition): add partition operation dispatcher #3221

Merged
merged 3 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import com.datastrato.gravitino.catalog.FilesetDispatcher;
import com.datastrato.gravitino.catalog.FilesetNormalizeDispatcher;
import com.datastrato.gravitino.catalog.FilesetOperationDispatcher;
import com.datastrato.gravitino.catalog.PartitionDispatcher;
import com.datastrato.gravitino.catalog.PartitionNormalizeDispatcher;
import com.datastrato.gravitino.catalog.PartitionOperationDispatcher;
import com.datastrato.gravitino.catalog.SchemaDispatcher;
import com.datastrato.gravitino.catalog.SchemaNormalizeDispatcher;
import com.datastrato.gravitino.catalog.SchemaOperationDispatcher;
Expand Down Expand Up @@ -59,6 +62,8 @@ public class GravitinoEnv {

private TableDispatcher tableDispatcher;

private PartitionDispatcher partitionDispatcher;

private FilesetDispatcher filesetDispatcher;

private TopicDispatcher topicDispatcher;
Expand Down Expand Up @@ -169,6 +174,11 @@ public void initialize(Config config) {
new TableNormalizeDispatcher(tableOperationDispatcher);
this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher);

PartitionOperationDispatcher partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator);
// todo: support PartitionEventDispatcher
this.partitionDispatcher = new PartitionNormalizeDispatcher(partitionOperationDispatcher);

FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
Expand Down Expand Up @@ -244,6 +254,10 @@ public TableDispatcher tableDispatcher() {
return tableDispatcher;
}

public PartitionDispatcher partitionDispatcher() {
return partitionDispatcher;
}

/**
* Get the FilesetDispatcher associated with the Gravitino environment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private static String applyCapabilitiesOnName(
return standardizeName;
}

private static String applyCaseSensitiveOnName(
public static String applyCaseSensitiveOnName(
Capability.Scope scope, String name, Capability capabilities) {
return capabilities.caseSensitiveOnName(scope).supported() ? name : name.toLowerCase();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.rel.SupportsPartitions;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.storage.IdGenerator;
import com.datastrato.gravitino.utils.IsolatedClassLoader;
Expand Down Expand Up @@ -135,6 +137,19 @@ public <R> R doWithTopicOps(ThrowableFunction<TopicCatalog, R> fn) throws Except
});
}

public <R> R doWithPartitionOps(
NameIdentifier tableIdent, ThrowableFunction<SupportsPartitions, R> fn) throws Exception {
return classLoader.withClassLoader(
cl -> {
Preconditions.checkArgument(
asTables() != null, "Catalog does not support table operations");
Table table = asTables().loadTable(tableIdent);
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we load table first and then as a parameter to this method? So we would not load the table each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but there is a trade-off:

  1. load table first and then as a parameter to this method: load table once, switch thread class loader 4 times(2 for load table, 2 for partition ops)
  2. current implementation: load table once (Since each request will only perform one partition operation currently), switch thread class loader 2 times

Therefore, I think it's fine for the current implementation, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation is fine as is, and we can change it after the full test is completed.

Preconditions.checkArgument(
table.supportPartitions() != null, "Table does not support partition operations");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it throwing an exception (UnsupportedOperationException) or returning null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsupportedOperationException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should I remove this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it can avoid the implementer returning null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it will never return null, you don't need to handle this, otherwise, you need to think about how to handle UnsupportedOperationException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it will never return null, you don't need to handle this,

Whether null will be returned depends on the developer of the catalog. The current implementation is more like defensive programming.

you need to think about how to handle UnsupportedOperationException

Since UnsupportedOperationException is a RuntimeException, I think we could just throws it out

return fn.apply(table.supportPartitions());
});
}

public <R> R doWithPropertiesMeta(ThrowableFunction<HasPropertyMetadata, R> fn)
throws Exception {
return classLoader.withClassLoader(cl -> fn.apply(catalog.ops()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.datastrato.gravitino.file.FilesetChange;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.SupportsPartitions;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.storage.IdGenerator;
import com.datastrato.gravitino.utils.ThrowableFunction;
Expand Down Expand Up @@ -60,6 +61,24 @@ public OperationDispatcher(
this.idGenerator = idGenerator;
}

<R, E extends Throwable> R doWithTable(
NameIdentifier tableIdent, ThrowableFunction<SupportsPartitions, R> fn, Class<E> ex)
throws E {
try {
NameIdentifier catalogIdent = getCatalogIdentifier(tableIdent);
CatalogManager.CatalogWrapper c = catalogManager.loadCatalogAndWrap(catalogIdent);
return c.doWithPartitionOps(tableIdent, fn);
} catch (Throwable throwable) {
if (ex.isInstance(throwable)) {
throw ex.cast(throwable);
}
if (RuntimeException.class.isAssignableFrom(throwable.getClass())) {
throw (RuntimeException) throwable;
}
throw new RuntimeException(throwable);
}
}

<R, E extends Throwable> R doWithCatalog(
NameIdentifier ident, ThrowableFunction<CatalogManager.CatalogWrapper, R> fn, Class<E> ex)
throws E {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.exceptions.NoSuchPartitionException;
import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException;
import com.datastrato.gravitino.rel.SupportsPartitions;
import com.datastrato.gravitino.rel.partitions.Partition;

/**
* {@code PartitionDispatcher} interface is a wrapper around the {@link SupportsPartitions}
* interface, adding {@link NameIdentifier} of table to the method parameters for find out the
* catalog class loader.
*/
public interface PartitionDispatcher {

/**
* List the names of all partitions in the table.
*
* @param tableIdent The identifier of the table.
* @return The names of all partitions in the table.
*/
String[] listPartitionNames(NameIdentifier tableIdent);

/**
* List all partitions in the table.
*
* @param tableIdent The identifier of the table.
* @return The list of partitions.
*/
Partition[] listPartitions(NameIdentifier tableIdent);

/**
* Get a partition by name from the table.
*
* @param tableIdent The identifier of the table.
* @param partitionName The name of the partition.
* @return The partition.
* @throws NoSuchPartitionException
*/
Partition getPartition(NameIdentifier tableIdent, String partitionName)
throws NoSuchPartitionException;

/**
* Check if a partition exists in the table.
*
* @param tableIdent The identifier of the table.
* @param partitionName The name of the partition.
* @return True if the partition exists, false otherwise.
*/
default boolean partitionExists(NameIdentifier tableIdent, String partitionName) {
try {
getPartition(tableIdent, partitionName);
return true;
} catch (NoSuchPartitionException e) {
return false;
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Add a partition to the table.
*
* @param tableIdent The identifier of the table.
* @param partition The partition to add.
* @return The added partition.
* @throws PartitionAlreadyExistsException If the partition already exists.
*/
Partition addPartition(NameIdentifier tableIdent, Partition partition)
throws PartitionAlreadyExistsException;

/**
* Drop a partition from the table by name.
*
* @param tableIdent The identifier of the table.
* @param partitionName The name of the partition.
* @return True if the partition was dropped, false if the partition does not exist.
*/
boolean dropPartition(NameIdentifier tableIdent, String partitionName);

/**
* Purge a partition from the table by name.
*
* @param tableIdent The identifier of the table.
* @param partitionName The name of the partition.
* @return True if the partition was purged, false if the partition does not exist.
* @throws UnsupportedOperationException If partition purging is not supported.
*/
default boolean purgePartition(NameIdentifier tableIdent, String partitionName)
throws UnsupportedOperationException {
throw new UnsupportedOperationException("Partition purging is not supported");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog;

import static com.datastrato.gravitino.catalog.CapabilityHelpers.applyCaseSensitiveOnName;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.exceptions.NoSuchPartitionException;
import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException;
import com.datastrato.gravitino.rel.partitions.IdentityPartition;
import com.datastrato.gravitino.rel.partitions.ListPartition;
import com.datastrato.gravitino.rel.partitions.Partition;
import com.datastrato.gravitino.rel.partitions.Partitions;
import com.datastrato.gravitino.rel.partitions.RangePartition;
import java.util.Arrays;

public class PartitionNormalizeDispatcher implements PartitionDispatcher {

private final PartitionOperationDispatcher dispatcher;

public PartitionNormalizeDispatcher(PartitionOperationDispatcher dispatcher) {
this.dispatcher = dispatcher;
}

@Override
public String[] listPartitionNames(NameIdentifier tableIdent) {
String[] partitionNames =
dispatcher.listPartitionNames(
CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher));
return applyCaseSensitive(tableIdent, partitionNames);
}

@Override
public Partition[] listPartitions(NameIdentifier tableIdent) {
Partition[] partitions =
dispatcher.listPartitions(
CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher));
return applyCaseSensitive(tableIdent, partitions);
}

@Override
public Partition getPartition(NameIdentifier tableIdent, String partitionName)
throws NoSuchPartitionException {
return dispatcher.getPartition(
CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher),
applyCaseSensitiveOnName(
Capability.Scope.PARTITION,
partitionName,
dispatcher.getCatalogCapability(tableIdent)));
}

@Override
public Partition addPartition(NameIdentifier tableIdent, Partition partition)
throws PartitionAlreadyExistsException {
return dispatcher.addPartition(
CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher),
applyCaseSensitive(tableIdent, partition));
}

@Override
public boolean dropPartition(NameIdentifier tableIdent, String partitionName) {
return dispatcher.dropPartition(
CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher),
applyCaseSensitiveOnName(
Capability.Scope.PARTITION,
partitionName,
dispatcher.getCatalogCapability(tableIdent)));
}

@Override
public boolean purgePartition(NameIdentifier tableIdent, String partitionName)
throws UnsupportedOperationException {
return dispatcher.purgePartition(
CapabilityHelpers.applyCaseSensitive(tableIdent, Capability.Scope.TABLE, dispatcher),
applyCaseSensitiveOnName(
Capability.Scope.PARTITION,
partitionName,
dispatcher.getCatalogCapability(tableIdent)));
}

private String[] applyCaseSensitive(NameIdentifier tableIdent, String[] partitionNames) {
Capability capabilities = dispatcher.getCatalogCapability(tableIdent);
return Arrays.stream(partitionNames)
.map(
partitionName ->
applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName, capabilities))
.toArray(String[]::new);
}

private Partition[] applyCaseSensitive(NameIdentifier tableIdent, Partition[] partitions) {
boolean caseSensitive =
dispatcher
.getCatalogCapability(tableIdent)
.caseSensitiveOnName(Capability.Scope.PARTITION)
.supported();
return Arrays.stream(partitions)
.map(partition -> applyCaseSensitive(partition, caseSensitive))
.toArray(Partition[]::new);
}

private Partition applyCaseSensitive(NameIdentifier tableIdent, Partition partition) {
boolean caseSensitive =
dispatcher
.getCatalogCapability(tableIdent)
.caseSensitiveOnName(Capability.Scope.PARTITION)
.supported();
return applyCaseSensitive(partition, caseSensitive);
}

private Partition applyCaseSensitive(Partition partition, boolean caseSensitive) {
String newName = caseSensitive ? partition.name() : partition.name().toLowerCase();
if (partition instanceof IdentityPartition) {
IdentityPartition identityPartition = (IdentityPartition) partition;
return Partitions.identity(
newName,
identityPartition.fieldNames(),
identityPartition.values(),
identityPartition.properties());

} else if (partition instanceof ListPartition) {
ListPartition listPartition = (ListPartition) partition;
return Partitions.list(newName, listPartition.lists(), listPartition.properties());

} else if (partition instanceof RangePartition) {
RangePartition rangePartition = (RangePartition) partition;
return Partitions.range(
newName, rangePartition.upper(), rangePartition.lower(), rangePartition.properties());

} else {
throw new IllegalArgumentException("Unknown partition type: " + partition.getClass());
}
}
}
Loading
Loading