Skip to content

Commit

Permalink
Fix incorrect $bucket for mismatch bucket queries
Browse files Browse the repository at this point in the history
  • Loading branch information
oraclechang authored and arhimondr committed May 7, 2022
1 parent 546cbaa commit 4dd11db
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public void testBucketedTableValidation()
// Alluxio metastore does not support create operations
}

@Override
public void testBucketedTableEvolutionWithDifferentReadBucketCount()
{
// Alluxio metastore does not support create operations
}

@Override
public void testEmptyOrcFile()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,14 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
if (partition.getPartition().isPresent()) {
Optional<HiveBucketProperty> partitionBucketProperty = partition.getPartition().get().getStorage().getBucketProperty();
if (tableBucketInfo.isPresent() && partitionBucketProperty.isPresent()) {
int readBucketCount = tableBucketInfo.get().getReadBucketCount();
int tableBucketCount = tableBucketInfo.get().getTableBucketCount();
BucketingVersion bucketingVersion = partitionBucketProperty.get().getBucketingVersion(); // TODO can partition's bucketing_version be different from table's?
int partitionBucketCount = partitionBucketProperty.get().getBucketCount();
// Validation was done in HiveSplitManager#getPartitionMetadata.
// Here, it's just trying to see if its needs the BucketConversion.
if (readBucketCount != partitionBucketCount) {
bucketConversion = Optional.of(new BucketConversion(bucketingVersion, readBucketCount, partitionBucketCount, tableBucketInfo.get().getBucketColumns()));
if (readBucketCount > partitionBucketCount) {
if (tableBucketCount != partitionBucketCount) {
bucketConversion = Optional.of(new BucketConversion(bucketingVersion, tableBucketCount, partitionBucketCount, tableBucketInfo.get().getBucketColumns()));
if (tableBucketCount > partitionBucketCount) {
bucketConversionRequiresWorkerParticipation = true;
}
}
Expand Down Expand Up @@ -721,7 +721,7 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
transaction,
maxSplitFileSize);
return Optional.of(locatedFileStatuses.stream()
.map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), splittable, Optional.empty()))
.map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), OptionalInt.empty(), splittable, Optional.empty()))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator());
Expand All @@ -743,6 +743,7 @@ private Iterator<InternalHiveSplit> generateOriginalFilesSplits(
return splitFactory.createInternalHiveSplit(
(LocatedFileStatus) fileStatus,
OptionalInt.empty(),
OptionalInt.empty(),
splittable,
acidInfo);
})
Expand Down Expand Up @@ -778,7 +779,7 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu
private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo)
{
return Streams.stream(new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions))
.map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), splittable, acidInfo))
.map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), OptionalInt.empty(), splittable, acidInfo))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
Expand All @@ -797,6 +798,8 @@ private List<InternalHiveSplit> getBucketedSplits(
int partitionBucketCount = bucketConversion.map(BucketConversion::getPartitionBucketCount).orElse(tableBucketCount);
int bucketCount = max(readBucketCount, partitionBucketCount);

checkState(readBucketCount <= tableBucketCount, "readBucketCount(%s) should be less than or equal to tableBucketCount(%s)", readBucketCount, tableBucketCount);

// build mapping of file name to bucket
ListMultimap<Integer, LocatedFileStatus> bucketFiles = ArrayListMultimap.create();
for (LocatedFileStatus file : files) {
Expand Down Expand Up @@ -840,19 +843,19 @@ private List<InternalHiveSplit> getBucketedSplits(
// Logical bucket #. Each logical bucket corresponds to a "bucket" from engine's perspective.
int readBucketNumber = bucketNumber % readBucketCount;

boolean containsEligibleTableBucket = false;
boolean containsIneligibleTableBucket = false;
List<Integer> eligibleTableBucketNumbers = new ArrayList<>();
for (int tableBucketNumber = bucketNumber % tableBucketCount; tableBucketNumber < tableBucketCount; tableBucketNumber += bucketCount) {
// table bucket number: this is used for evaluating "$bucket" filters.
if (bucketSplitInfo.isTableBucketEnabled(tableBucketNumber)) {
containsEligibleTableBucket = true;
eligibleTableBucketNumbers.add(tableBucketNumber);
}
else {
containsIneligibleTableBucket = true;
}
}

if (containsEligibleTableBucket && containsIneligibleTableBucket) {
if (!eligibleTableBucketNumbers.isEmpty() && containsIneligibleTableBucket) {
throw new TrinoException(
NOT_SUPPORTED,
"The bucket filter cannot be satisfied. There are restrictions on the bucket filter when all the following is true: " +
Expand All @@ -862,12 +865,13 @@ private List<InternalHiveSplit> getBucketedSplits(
"(table name: " + table.getTableName() + ", table bucket count: " + tableBucketCount + ", " +
"partition bucket count: " + partitionBucketCount + ", effective reading bucket count: " + readBucketCount + ")");
}
if (containsEligibleTableBucket) {
if (!eligibleTableBucketNumbers.isEmpty()) {
for (LocatedFileStatus file : bucketFiles.get(partitionBucketNumber)) {
// OrcDeletedRows will load only delete delta files matching current bucket id,
// so we can pass all delete delta locations here, without filtering.
splitFactory.createInternalHiveSplit(file, OptionalInt.of(readBucketNumber), splittable, acidInfo)
.ifPresent(splitList::add);
eligibleTableBucketNumbers.stream()
.map(tableBucketNumber -> splitFactory.createInternalHiveSplit(file, OptionalInt.of(readBucketNumber), OptionalInt.of(tableBucketNumber), splittable, acidInfo))
.forEach(optionalSplit -> optionalSplit.ifPresent(splitList::add));
}
}
}
Expand Down Expand Up @@ -991,11 +995,6 @@ public static Optional<BucketSplitInfo> createBucketSplitInfo(Optional<HiveBucke
int tableBucketCount = bucketHandle.get().getTableBucketCount();
int readBucketCount = bucketHandle.get().getReadBucketCount();

if (tableBucketCount != readBucketCount && bucketFilter.isPresent()) {
// TODO: remove when supported
throw new TrinoException(NOT_SUPPORTED, "Filter on \"$bucket\" is not supported when the table has partitions with different bucket counts");
}

List<HiveColumnHandle> bucketColumns = bucketHandle.get().getColumns();
IntPredicate predicate = bucketFilter
.<IntPredicate>map(filter -> filter.getBucketsToKeep()::contains)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle)
{
return value -> ((HiveSplit) value).getBucketNumber()
return value -> ((HiveSplit) value).getReadBucketNumber()
.orElseThrow(() -> new IllegalArgumentException("Bucket number not set in split"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public ConnectorPageSource createPageSource(
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
path,
hiveSplit.getBucketNumber(),
hiveSplit.getTableBucketNumber(),
hiveSplit.getEstimatedFileSize(),
hiveSplit.getFileModifiedTime());

Expand All @@ -198,7 +198,7 @@ public ConnectorPageSource createPageSource(
configuration,
session,
path,
hiveSplit.getBucketNumber(),
hiveSplit.getTableBucketNumber(),
hiveSplit.getStart(),
hiveSplit.getLength(),
hiveSplit.getEstimatedFileSize(),
Expand Down Expand Up @@ -235,7 +235,7 @@ public ConnectorPageSource createPageSource(
hiveSplit.getStatementId(),
source,
typeManager,
hiveSplit.getBucketNumber(),
hiveSplit.getTableBucketNumber(),
path,
originalFile,
orcFileWriterFactory.get(),
Expand All @@ -259,7 +259,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Configuration configuration,
ConnectorSession session,
Path path,
OptionalInt bucketNumber,
OptionalInt tableBucketNumber,
long start,
long length,
long estimatedFileSize,
Expand All @@ -281,8 +281,8 @@ public static Optional<ConnectorPageSource> createHivePageSource(

List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);

Optional<BucketAdaptation> bucketAdaptation = createBucketAdaptation(bucketConversion, bucketNumber, regularAndInterimColumnMappings);
Optional<BucketValidator> bucketValidator = createBucketValidator(path, bucketValidation, bucketNumber, regularAndInterimColumnMappings);
Optional<BucketAdaptation> bucketAdaptation = createBucketAdaptation(bucketConversion, tableBucketNumber, regularAndInterimColumnMappings);
Optional<BucketValidator> bucketValidator = createBucketValidator(path, bucketValidation, tableBucketNumber, regularAndInterimColumnMappings);

for (HivePageSourceFactory pageSourceFactory : pageSourceFactories) {
List<HiveColumnHandle> desiredColumns = toColumnHandles(regularAndInterimColumnMappings, true, typeManager);
Expand All @@ -298,7 +298,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
desiredColumns,
effectivePredicate,
acidInfo,
bucketNumber,
tableBucketNumber,
originalFile,
transaction);

Expand Down Expand Up @@ -386,11 +386,11 @@ public static Optional<ConnectorPageSource> createHivePageSource(

private static boolean shouldSkipBucket(HiveTableHandle hiveTable, HiveSplit hiveSplit, DynamicFilter dynamicFilter)
{
if (hiveSplit.getBucketNumber().isEmpty()) {
if (hiveSplit.getTableBucketNumber().isEmpty()) {
return false;
}
Optional<HiveBucketFilter> hiveBucketFilter = getHiveBucketFilter(hiveTable, dynamicFilter.getCurrentPredicate());
return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getBucketNumber().getAsInt())).orElse(false);
return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getTableBucketNumber().getAsInt())).orElse(false);
}

private static boolean shouldSkipSplit(List<ColumnMapping> columnMappings, DynamicFilter dynamicFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class HiveSplit
private final String database;
private final String table;
private final String partitionName;
private final OptionalInt bucketNumber;
private final OptionalInt readBucketNumber;
private final OptionalInt tableBucketNumber;
private final int statementId;
private final boolean forceLocalScheduling;
private final TableToPartitionMapping tableToPartitionMapping;
Expand All @@ -76,7 +77,8 @@ public HiveSplit(
@JsonProperty("schema") Properties schema,
@JsonProperty("partitionKeys") List<HivePartitionKey> partitionKeys,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("bucketNumber") OptionalInt bucketNumber,
@JsonProperty("readBucketNumber") OptionalInt readBucketNumber,
@JsonProperty("tableBucketNumber") OptionalInt tableBucketNumber,
@JsonProperty("statementId") int statementId,
@JsonProperty("forceLocalScheduling") boolean forceLocalScheduling,
@JsonProperty("tableToPartitionMapping") TableToPartitionMapping tableToPartitionMapping,
Expand All @@ -97,7 +99,8 @@ public HiveSplit(
requireNonNull(schema, "schema is null");
requireNonNull(partitionKeys, "partitionKeys is null");
requireNonNull(addresses, "addresses is null");
requireNonNull(bucketNumber, "bucketNumber is null");
requireNonNull(readBucketNumber, "readBucketNumber is null");
requireNonNull(tableBucketNumber, "tableBucketNumber is null");
requireNonNull(tableToPartitionMapping, "tableToPartitionMapping is null");
requireNonNull(bucketConversion, "bucketConversion is null");
requireNonNull(bucketValidation, "bucketValidation is null");
Expand All @@ -114,7 +117,8 @@ public HiveSplit(
this.schema = schema;
this.partitionKeys = ImmutableList.copyOf(partitionKeys);
this.addresses = ImmutableList.copyOf(addresses);
this.bucketNumber = bucketNumber;
this.readBucketNumber = readBucketNumber;
this.tableBucketNumber = tableBucketNumber;
this.statementId = statementId;
this.forceLocalScheduling = forceLocalScheduling;
this.tableToPartitionMapping = tableToPartitionMapping;
Expand Down Expand Up @@ -194,9 +198,15 @@ public List<HostAddress> getAddresses()
}

@JsonProperty
public OptionalInt getBucketNumber()
public OptionalInt getReadBucketNumber()
{
return bucketNumber;
return readBucketNumber;
}

@JsonProperty
public OptionalInt getTableBucketNumber()
{
return tableBucketNumber;
}

@JsonProperty
Expand Down Expand Up @@ -271,7 +281,8 @@ public long getRetainedSizeInBytes()
+ estimatedSizeOf(database)
+ estimatedSizeOf(table)
+ estimatedSizeOf(partitionName)
+ sizeOf(bucketNumber)
+ sizeOf(readBucketNumber)
+ sizeOf(tableBucketNumber)
+ tableToPartitionMapping.getEstimatedSizeInBytes()
+ sizeOf(bucketConversion, BucketConversion::getRetainedSizeInBytes)
+ sizeOf(bucketValidation, BucketValidation::getRetainedSizeInBytes)
Expand Down Expand Up @@ -317,7 +328,7 @@ public static class BucketConversion
private final int tableBucketCount;
private final int partitionBucketCount;
private final List<HiveColumnHandle> bucketColumnNames;
// bucketNumber is needed, but can be found in bucketNumber field of HiveSplit.
// tableBucketNumber is needed, but can be found in tableBucketNumber field of HiveSplit.

@JsonCreator
public BucketConversion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ public ConnectorSplitSource getSplits(
// sort partitions
partitions = Ordering.natural().onResultOf(HivePartition::getPartitionId).reverse().sortedCopy(partitions);

if (bucketHandle.isPresent()) {
if (bucketHandle.get().getReadBucketCount() > bucketHandle.get().getTableBucketCount()) {
throw new TrinoException(
GENERIC_INTERNAL_ERROR,
"readBucketCount (%s) is greater than the tableBucketCount (%s) which generally points to an issue in plan generation");
}
}

Iterable<HivePartitionMetadata> hivePartitions = getPartitionMetadata(session, metastore, table, tableName, partitions, bucketHandle.map(HiveBucketHandle::toTableBucketProperty));

// Only one thread per partition is usable when a table is not transactional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ ListenableFuture<Void> addToQueue(InternalHiveSplit split)
databaseName, tableName, succinctBytes(maxOutstandingSplitsBytes), getBufferedInternalSplitCount()));
}
bufferedInternalSplitCount.incrementAndGet();
OptionalInt bucketNumber = split.getBucketNumber();
OptionalInt bucketNumber = split.getReadBucketNumber();
return queues.offer(bucketNumber, split);
}

Expand Down Expand Up @@ -400,7 +400,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.getSchema(),
internalSplit.getPartitionKeys(),
block.getAddresses(),
internalSplit.getBucketNumber(),
internalSplit.getReadBucketNumber(),
internalSplit.getTableBucketNumber(),
internalSplit.getStatementId(),
internalSplit.isForceLocalScheduling(),
internalSplit.getTableToPartitionMapping(),
Expand Down
Loading

0 comments on commit 4dd11db

Please sign in to comment.