Skip to content

Commit

Permalink
Support arbitrary types in partitioning columns (deephaven#885)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy authored and jmao-denver committed Nov 23, 2021
1 parent 764c41c commit e0f2166
Show file tree
Hide file tree
Showing 53 changed files with 1,069 additions and 387 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import io.deephaven.db.v2.sources.regioned.RegionedTableComponentFactoryImpl;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;

import static io.deephaven.db.v2.parquet.ParquetTableWriter.PARQUET_FILE_EXTENSION;
import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -433,7 +435,8 @@ public static Table readMultiFileTable(
if (partitionValue == null) {
throw new IllegalArgumentException("First location key " + firstKey + " has null partition value at partition key " + partitionKey);
}
allColumns.add(ColumnDefinition.fromGenericType(partitionKey, partitionValue.getClass(), ColumnDefinition.COLUMNTYPE_PARTITIONING, null));
//noinspection unchecked
allColumns.add(ColumnDefinition.fromGenericType(partitionKey, getUnboxedTypeIfBoxed(partitionValue.getClass()), ColumnDefinition.COLUMNTYPE_PARTITIONING, null));
}
allColumns.addAll(schemaInfo.getFirst());
return readMultiFileTable(recordingLocationKeyFinder, schemaInfo.getSecond(), new TableDefinition(allColumns));
Expand Down
16 changes: 10 additions & 6 deletions DB/src/main/java/io/deephaven/db/v2/PartitionAwareSourceTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
import io.deephaven.db.v2.select.*;
import io.deephaven.db.v2.sources.ArrayBackedColumnSource;
import io.deephaven.db.v2.sources.ColumnSource;
import io.deephaven.db.v2.sources.WritableSource;
import org.apache.commons.lang3.mutable.MutableLong;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.reflect.Array;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -194,10 +195,13 @@ protected final Table redefine(TableDefinition newDefinitionExternal, TableDefin
@SuppressWarnings("unchecked")
private static <T> ColumnSource makePartitionSource(@NotNull final ColumnDefinition<T> columnDefinition, @NotNull final Collection<ImmutableTableLocationKey> locationKeys) {
final Class<T> dataType = columnDefinition.getDataType();
final T[] partitionValues = locationKeys.stream()
.map(lk -> (T) lk.getPartitionValue(columnDefinition.getName()))
.toArray(sz -> (T[]) Array.newInstance(dataType, sz));
return ArrayBackedColumnSource.getMemoryColumnSource(partitionValues, dataType, columnDefinition.getComponentType());
final String partitionKey = columnDefinition.getName();
final WritableSource<T> result = ArrayBackedColumnSource.getMemoryColumnSource(locationKeys.size(), dataType, null);
final MutableLong nextIndex = new MutableLong(0L);
locationKeys.stream()
.map(lk -> (T) lk.getPartitionValue(partitionKey))
.forEach((final T partitionValue) -> result.set(nextIndex.getAndIncrement(), partitionValue));
return result;
}

@Override
Expand Down Expand Up @@ -261,7 +265,7 @@ public final Table where(SelectFilter... filters) {
return deferredViewTable;
}

SelectFilter[] partitionFilterArray = partitionFilters.toArray(new SelectFilter[partitionFilters.size()]);
SelectFilter[] partitionFilterArray = partitionFilters.toArray(SelectFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY);
final String filteredTableDescription = "getFilteredTable(" + Arrays.toString(partitionFilterArray) + ")";
SourceTable filteredTable = QueryPerformanceRecorder.withNugget(filteredTableDescription, () -> getFilteredTable(partitionFilterArray));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ public interface TableLocationFactory<TK extends TableKey, TLK extends TableLoca
* {@link TableLocationProvider#supportsSubscriptions() supports subscriptions}
* @return A new or cached {@link TableLocation} identified by the supplied {@link TableKey} and {@link TableLocationKey}
*/
TableLocation makeLocation(@NotNull final TK tableKey, @NotNull TLK locationKey, @Nullable TableDataRefreshService refreshService);
TableLocation makeLocation(@NotNull TK tableKey, @NotNull TLK locationKey, @Nullable TableDataRefreshService refreshService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public final void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
boolean needToUpdateInternalPartitionValue = true;
try (final DirectoryStream<Path> columnPartitionStream = Files.newDirectoryStream(internalPartition, Files::isDirectory)) {
for (final Path columnPartition : columnPartitionStream) {
partitions.put(columnPartitionKey, columnPartition.getFileName().toFile());
partitions.put(columnPartitionKey, columnPartition.getFileName().toString());
if (needToUpdateInternalPartitionValue) {
// Partition order dictates comparison priority, so we need to insert the internal partition after the column partition.
partitions.put(INTERNAL_PARTITION_KEY, internalPartitionValue);
Expand Down
58 changes: 33 additions & 25 deletions DB/src/main/java/io/deephaven/db/v2/sources/chunk/page/Page.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.deephaven.db.v2.sources.chunk.page;

import io.deephaven.db.util.LongSizedDataStructure;
import io.deephaven.db.v2.sources.chunk.Attributes.Any;
import io.deephaven.db.v2.sources.chunk.ChunkSource;
import io.deephaven.db.v2.sources.chunk.Attributes;
import io.deephaven.db.v2.sources.chunk.DefaultChunkSource;
import io.deephaven.db.v2.sources.chunk.WritableChunk;
import io.deephaven.db.v2.utils.OrderedKeys;
Expand All @@ -12,27 +12,27 @@
/**
* This provides the {@link ChunkSource} interface to a contiguous block of data from
* the range [{@link #firstRowOffset()},{@link #firstRowOffset()} + {@link #length()}).
*
* <p>
* Non overlapping pages can be collected together in a {@link PageStore}, which provides the {@link ChunkSource}
* interface to the collection of all of its Pages.
*
* <p>
* There are two distinct use cases/types of pages. The first use case are {@code Page}s which always have a
* length() > 0. These store length() values, which can be assessed via the {@link ChunkSource} methods.
* Valid {@link OrderedKeys} passed to those methods will have their offset in the range
* [firstRowOffset(), firstRowOffset() + length()). Passing OrderKeys with offsets outside of this range will have
* undefined results.
*
* <p>
* The second use case will always have length() == 0 and firstRowOffset() == 0. These represent "Null" regions
* which return a fixed value, typically a null value, for every {@link OrderedKeys} passed into the
* {@link ChunkSource} methods. In order to have this use case, override {@code length} and override {@code lastRow}
* as {@code maxRow}.
*
* <p>
* Though the {@link ChunkSource} methods ignore the non-offset portion of the rows in the {@link OrderedKeys},
* then can assume they are identical for all the passed in elements of the {@link OrderedKeys}. For instance,
* they can use the simple difference between the complete row value to determine a length.
*/

public interface Page<ATTR extends Attributes.Any> extends PagingChunkSource<ATTR> {
public interface Page<ATTR extends Any> extends PagingChunkSource<ATTR> {

/**
* @return the first row of this page, after applying the {@link #mask()}, which refers to the first row of this
Expand All @@ -45,7 +45,7 @@ public interface Page<ATTR extends Attributes.Any> extends PagingChunkSource<ATT
* @return the first row of this page, located in the same way as row.
*/
@FinalDefault
default long firstRow(long row) {
default long firstRow(final long row) {
final long m = mask();
return (row & ~m) | firstRowOffset();
}
Expand All @@ -54,7 +54,7 @@ default long firstRow(long row) {
* @param row Any row contained on this page.
* @return the last row of this page, located in the same way as row.
*/
default long lastRow(long row) {
default long lastRow(final long row) {
long l = length();
long m = mask();

Expand All @@ -74,15 +74,20 @@ default long getRowOffset(long row) {
return (row & mask()) - firstRowOffset();
}

interface WithDefaults<ATTR extends Attributes.Any> extends Page<ATTR>, DefaultChunkSource<ATTR> {
/**
* Helper defaults for general pages.
*/
interface WithDefaults<ATTR extends Any> extends Page<ATTR>, DefaultChunkSource<ATTR> {

@Override @FinalDefault
default void fillChunkAppend(@NotNull FillContext context, @NotNull WritableChunk<? super ATTR> destination, @NotNull OrderedKeys.Iterator orderedKeysIterator) {
fillChunkAppend(context, destination, orderedKeysIterator.getNextOrderedKeysThrough(maxRow(orderedKeysIterator.peekNextKey())));
@Override
@FinalDefault
default void fillChunkAppend(@NotNull final FillContext context, @NotNull final WritableChunk<? super ATTR> destination, @NotNull final OrderedKeys.Iterator orderedKeysIterator) {
fillChunkAppend(context, destination, orderedKeysIterator.getNextOrderedKeysThrough(maxRow(orderedKeysIterator.peekNextKey())));
}

@Override @FinalDefault
default void fillChunk(@NotNull FillContext context, @NotNull WritableChunk<? super ATTR> destination, @NotNull OrderedKeys orderedKeys) {
@Override
@FinalDefault
default void fillChunk(@NotNull final FillContext context, @NotNull final WritableChunk<? super ATTR> destination, @NotNull final OrderedKeys orderedKeys) {
destination.setSize(0);
fillChunkAppend(context, destination, orderedKeys);
}
Expand All @@ -95,29 +100,32 @@ default void fillChunk(@NotNull FillContext context, @NotNull WritableChunk<? su
}

/**
* This has helper defaults for columns that just represent a repeating value (such as null or partition columns).
* Helper defaults for pages that represent a repeating value, e.g. null or partitioning column regions.
*/
interface WithDefaultsForRepeatingValues<ATTR extends Any> extends Page<ATTR>, DefaultChunkSource<ATTR> {

interface WithDefaultsForRepeatingValues<ATTR extends Attributes.Any> extends Page<ATTR>, DefaultChunkSource<ATTR> {

@Override @FinalDefault
@Override
@FinalDefault
default long length() {
return 0;
}

@Override @FinalDefault
default long lastRow(long row) {
@Override
@FinalDefault
default long lastRow(final long row) {
return maxRow(row);
}

@Override @FinalDefault
default void fillChunkAppend(@NotNull FillContext context, @NotNull WritableChunk<? super ATTR> destination, @NotNull OrderedKeys.Iterator orderedKeysIterator) {
@Override
@FinalDefault
default void fillChunkAppend(@NotNull final FillContext context, @NotNull final WritableChunk<? super ATTR> destination, @NotNull final OrderedKeys.Iterator orderedKeysIterator) {
fillChunkAppend(context, destination, LongSizedDataStructure.intSize("fillChunkAppend",
orderedKeysIterator.advanceAndGetPositionDistance(maxRow(orderedKeysIterator.peekNextKey()))));
orderedKeysIterator.advanceAndGetPositionDistance(maxRow(orderedKeysIterator.peekNextKey()))));
}

@Override @FinalDefault
default void fillChunk(@NotNull FillContext context, @NotNull WritableChunk<? super ATTR> destination, @NotNull OrderedKeys orderedKeys) {
@Override
@FinalDefault
default void fillChunk(@NotNull final FillContext context, @NotNull final WritableChunk<? super ATTR> destination, @NotNull final OrderedKeys orderedKeys) {
destination.setSize(0);
fillChunkAppend(context, destination, orderedKeys.intSize());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
package io.deephaven.db.v2.sources.regioned;

import io.deephaven.db.v2.sources.Releasable;
import io.deephaven.db.v2.sources.chunk.Attributes;
import io.deephaven.db.v2.sources.chunk.Attributes.Any;
import io.deephaven.db.v2.sources.chunk.WritableChunk;
import io.deephaven.db.v2.sources.chunk.page.Page;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;

public interface ColumnRegion<ATTR extends Attributes.Any> extends Page<ATTR>, Releasable {
public interface ColumnRegion<ATTR extends Any> extends Page<ATTR>, Releasable {

long REGION_MASK = RegionedPageStore.REGION_MASK;

@Override @FinalDefault
@Override
@FinalDefault
default long mask() {
return REGION_MASK;
}

@Override @FinalDefault
@Override
@FinalDefault
default long firstRowOffset() {
return 0;
}

abstract class Null<ATTR extends Attributes.Any>
abstract class Null<ATTR extends Any>
implements ColumnRegion<ATTR>, WithDefaultsForRepeatingValues<ATTR> {

Null() {}
Null() {
}

@Override
public void fillChunkAppend(@NotNull FillContext context, @NotNull WritableChunk<? super ATTR> destination, int length) {
int offset = destination.size();
public void fillChunkAppend(@NotNull final FillContext context, @NotNull final WritableChunk<? super ATTR> destination, final int length) {
final int offset = destination.size();

destination.fillWithNullValue(offset, length);
destination.setSize(offset + length);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package io.deephaven.db.v2.sources.regioned;

import io.deephaven.db.v2.sources.chunk.Attributes.Any;
import io.deephaven.db.v2.sources.chunk.WritableChunk;
import io.deephaven.util.QueryConstants;
import io.deephaven.db.v2.sources.chunk.Attributes;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;

/**
* Column region interface for regions that support fetching primitive bytes.
*/
public interface ColumnRegionByte<ATTR extends Attributes.Any> extends ColumnRegion<ATTR> {
public interface ColumnRegionByte<ATTR extends Any> extends ColumnRegion<ATTR> {

/**
* Get a single byte from this region.
Expand All @@ -27,7 +28,7 @@ public interface ColumnRegionByte<ATTR extends Attributes.Any> extends ColumnReg
* @param elementIndex Element (byte) index in the table's address space
* @return The byte value at the specified element (byte) index
*/
default byte getByte(@NotNull FillContext context, long elementIndex) {
default byte getByte(@NotNull final FillContext context, final long elementIndex) {
return getByte(elementIndex);
}

Expand All @@ -52,26 +53,54 @@ default Class<?> getNativeType() {
return byte.class;
}

static <ATTR extends Attributes.Any> ColumnRegionByte.Null<ATTR> createNull() {
static <ATTR extends Any> ColumnRegionByte.Null<ATTR> createNull() {
//noinspection unchecked
return Null.INSTANCE;
}

final class Null<ATTR extends Attributes.Any> extends ColumnRegion.Null<ATTR> implements ColumnRegionByte<ATTR> {
final class Null<ATTR extends Any> extends ColumnRegion.Null<ATTR> implements ColumnRegionByte<ATTR> {
@SuppressWarnings("rawtypes")
private static final ColumnRegionByte.Null INSTANCE = new ColumnRegionByte.Null();

private Null() {}
private Null() {
}

@Override
public byte getByte(long elementIndex) {
public byte getByte(final long elementIndex) {
return QueryConstants.NULL_BYTE;
}

@Override
public byte[] getBytes(long firstElementIndex, @NotNull byte[] destination, int destinationOffset, int length) {
public byte[] getBytes(final long firstElementIndex, @NotNull final byte[] destination, final int destinationOffset, final int length) {
Arrays.fill(destination, destinationOffset, destinationOffset + length, QueryConstants.NULL_BYTE);
return destination;
}
}

final class Constant<ATTR extends Any> implements ColumnRegionByte<ATTR>, WithDefaultsForRepeatingValues<ATTR> {

private final byte value;

public Constant(final byte value) {
this.value = value;
}

@Override
public byte getByte(final long elementIndex) {
return value;
}

@Override
public void fillChunkAppend(@NotNull final FillContext context, @NotNull final WritableChunk<? super ATTR> destination, final int length) {
final int offset = destination.size();
destination.asWritableByteChunk().fillWithValue(offset, length, value);
destination.setSize(offset + length);
}

@Override
public byte[] getBytes(final long firstElementIndex, @NotNull final byte[] destination, final int destinationOffset, final int length) {
Arrays.fill(destination, destinationOffset, destinationOffset + length, value);
return destination;
}
}
}
Loading

0 comments on commit e0f2166

Please sign in to comment.