Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Encoding for LocalDate and LocalTime #5446

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/barrage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':Base')
implementation project(':Util')
implementation project(':Configuration')
implementation project(':util-function')
implementation depCommonsLang3

api project(':engine-table')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
// @formatter:off
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import com.google.common.io.LittleEndianDataOutputStream;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToByteFunction;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.WritableByteChunk;
Expand All @@ -27,6 +30,8 @@
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.function.IntFunction;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -35,12 +40,19 @@ public class ByteChunkInputStreamGenerator extends BaseChunkInputStreamGenerator

public static ByteChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Byte, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
return convertWithTransform(inChunk, rowOffset, TypeUtils::unbox);
}

public static <T> ByteChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToByteFunction<T> transform) {
// This code path is utilized for arrays and vectors of DateTimes, LocalDate, and LocalTime, which cannot be
// reinterpreted.
WritableByteChunk<Values> outChunk = WritableByteChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
final Byte value = inChunk.get(i);
outChunk.set(i, TypeUtils.unbox(value));
T value = inChunk.get(i);
outChunk.set(i, transform.applyAsByte(value));
}
// inChunk is a transfer of ownership to us, but we've converted what we need, so we must close it now
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
Expand Down Expand Up @@ -163,7 +175,7 @@ public interface ByteConversion {
ByteConversion IDENTITY = (byte a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
static WritableByteChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
Expand All @@ -177,7 +189,41 @@ static WritableChunk<Values> extractChunkFromInputStream(
totalRows);
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final int elementSize,
final StreamReaderOptions options,
final Function<Byte, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableByteChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
byte value = inner.get(ii);
chunk.set(outOffset + ii, transform.apply(value));
}

return chunk;
}
}

static WritableByteChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final ByteConversion conversion,
Expand All @@ -192,14 +238,11 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}
final WritableByteChunk<Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, nodeInfo.numElements),
WritableByteChunk::makeWritableChunk,
WritableChunk::asWritableByteChunk);

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -244,6 +287,19 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
return chunk;
}

private static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
final int numRows,
final IntFunction<T> chunkFactory,
final Function<WritableChunk<Values>, T> castFunction) {
if (outChunk != null) {
return castFunction.apply(outChunk);
}
final T newChunk = chunkFactory.apply(numRows);
newChunk.setSize(numRows);
return newChunk;
}

private static void useDeephavenNulls(
final ByteConversion conversion,
final DataInput is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import com.google.common.io.LittleEndianDataOutputStream;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToCharFunction;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.WritableCharChunk;
Expand All @@ -23,6 +26,8 @@
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.function.IntFunction;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -31,12 +36,19 @@ public class CharChunkInputStreamGenerator extends BaseChunkInputStreamGenerator

public static CharChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Character, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
return convertWithTransform(inChunk, rowOffset, TypeUtils::unbox);
}

public static <T> CharChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToCharFunction<T> transform) {
// This code path is utilized for arrays and vectors of DateTimes, LocalDate, and LocalTime, which cannot be
// reinterpreted.
WritableCharChunk<Values> outChunk = WritableCharChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
final Character value = inChunk.get(i);
outChunk.set(i, TypeUtils.unbox(value));
T value = inChunk.get(i);
outChunk.set(i, transform.applyAsChar(value));
}
// inChunk is a transfer of ownership to us, but we've converted what we need, so we must close it now
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
Expand Down Expand Up @@ -159,7 +171,7 @@ public interface CharConversion {
CharConversion IDENTITY = (char a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
static WritableCharChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
Expand All @@ -173,7 +185,41 @@ static WritableChunk<Values> extractChunkFromInputStream(
totalRows);
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final int elementSize,
final StreamReaderOptions options,
final Function<Character, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableCharChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
char value = inner.get(ii);
chunk.set(outOffset + ii, transform.apply(value));
}

return chunk;
}
}

static WritableCharChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final CharConversion conversion,
Expand All @@ -188,14 +234,11 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableCharChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableCharChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableCharChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}
final WritableCharChunk<Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, nodeInfo.numElements),
WritableCharChunk::makeWritableChunk,
WritableChunk::asWritableCharChunk);

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -240,6 +283,19 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
return chunk;
}

private static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
final int numRows,
final IntFunction<T> chunkFactory,
final Function<WritableChunk<Values>, T> castFunction) {
if (outChunk != null) {
return castFunction.apply(outChunk);
}
final T newChunk = chunkFactory.apply(numRows);
newChunk.setSize(numRows);
return newChunk;
}

private static void useDeephavenNulls(
final CharConversion conversion,
final DataInput is,
Expand Down
Loading
Loading