Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Encoding for LocalDate and LocalTime #5446

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// @formatter:off
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
Expand All @@ -17,7 +18,6 @@
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToByteFunction;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.WritableByteChunk;
Expand All @@ -31,6 +31,7 @@
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.function.IntFunction;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -39,26 +40,19 @@ public class ByteChunkInputStreamGenerator extends BaseChunkInputStreamGenerator

public static ByteChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Byte, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
WritableByteChunk<Values> outChunk = WritableByteChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
final Byte value = inChunk.get(i);
outChunk.set(i, TypeUtils.unbox(value));
}
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new ByteChunkInputStreamGenerator(outChunk, Byte.BYTES, rowOffset);
return convertWithTransform(inChunk, rowOffset, TypeUtils::unbox);
}

public static <T> ByteChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToByteFunction<T> transform) {
// This code path is utilized for LocalDate and LocalTime
// This code path is utilized for arrays and vectors of DateTimes, LocalDate, and LocalTime, which cannot be
// reinterpreted.
WritableByteChunk<Values> outChunk = WritableByteChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
T value = inChunk.get(i);
outChunk.set(i, value == null ? QueryConstants.NULL_BYTE : transform.applyAsByte(value));
outChunk.set(i, transform.applyAsByte(value));
}
// inChunk is a transfer of ownership to us, but we've converted what we need, so we must close it now
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
Expand Down Expand Up @@ -181,7 +175,7 @@ public interface ByteConversion {
ByteConversion IDENTITY = (byte a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
static WritableByteChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
Expand All @@ -207,27 +201,29 @@ static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransfo
final int totalRows) throws IOException {

try (final WritableByteChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0).asWritableByteChunk()) {
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableObjectChunk();
} else {
final int numRows = Math.max(totalRows, inner.size());
chunk = WritableObjectChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
byte value = inner.get(ii);
chunk.set(outOffset + ii, value == NULL_BYTE ? null : transform.apply(value));
chunk.set(outOffset + ii, transform.apply(value));
}

return chunk;
}
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
static WritableByteChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final ByteConversion conversion,
Expand All @@ -242,14 +238,11 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}
final WritableByteChunk<Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, nodeInfo.numElements),
WritableByteChunk::makeWritableChunk,
WritableChunk::asWritableByteChunk);

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -294,6 +287,19 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
return chunk;
}

private static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
final int numRows,
final IntFunction<T> chunkFactory,
final Function<WritableChunk<Values>, T> castFunction) {
if (outChunk != null) {
return castFunction.apply(outChunk);
}
final T newChunk = chunkFactory.apply(numRows);
newChunk.setSize(numRows);
return newChunk;
}

private static void useDeephavenNulls(
final ByteConversion conversion,
final DataInput is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
Expand All @@ -13,7 +14,6 @@
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToCharFunction;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.WritableCharChunk;
Expand All @@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.function.IntFunction;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -35,26 +36,19 @@ public class CharChunkInputStreamGenerator extends BaseChunkInputStreamGenerator

public static CharChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Character, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
WritableCharChunk<Values> outChunk = WritableCharChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
final Character value = inChunk.get(i);
outChunk.set(i, TypeUtils.unbox(value));
}
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new CharChunkInputStreamGenerator(outChunk, Character.BYTES, rowOffset);
return convertWithTransform(inChunk, rowOffset, TypeUtils::unbox);
}

public static <T> CharChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToCharFunction<T> transform) {
// This code path is utilized for LocalDate and LocalTime
// This code path is utilized for arrays and vectors of DateTimes, LocalDate, and LocalTime, which cannot be
// reinterpreted.
WritableCharChunk<Values> outChunk = WritableCharChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
T value = inChunk.get(i);
outChunk.set(i, value == null ? QueryConstants.NULL_CHAR : transform.applyAsChar(value));
outChunk.set(i, transform.applyAsChar(value));
}
// inChunk is a transfer of ownership to us, but we've converted what we need, so we must close it now
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
Expand Down Expand Up @@ -177,7 +171,7 @@ public interface CharConversion {
CharConversion IDENTITY = (char a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
static WritableCharChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
Expand All @@ -203,27 +197,29 @@ static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransfo
final int totalRows) throws IOException {

try (final WritableCharChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0).asWritableCharChunk()) {
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableObjectChunk();
} else {
final int numRows = Math.max(totalRows, inner.size());
chunk = WritableObjectChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
char value = inner.get(ii);
chunk.set(outOffset + ii, value == NULL_CHAR ? null : transform.apply(value));
chunk.set(outOffset + ii, transform.apply(value));
}

return chunk;
}
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
static WritableCharChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final CharConversion conversion,
Expand All @@ -238,14 +234,11 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableCharChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableCharChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableCharChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}
final WritableCharChunk<Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, nodeInfo.numElements),
WritableCharChunk::makeWritableChunk,
WritableChunk::asWritableCharChunk);

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -290,6 +283,19 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
return chunk;
}

private static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
final int numRows,
final IntFunction<T> chunkFactory,
final Function<WritableChunk<Values>, T> castFunction) {
if (outChunk != null) {
return castFunction.apply(outChunk);
}
final T newChunk = chunkFactory.apply(numRows);
newChunk.setSize(numRows);
return newChunk;
}

private static void useDeephavenNulls(
final CharConversion conversion,
final DataInput is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.ColumnConversionMode;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
Expand All @@ -24,6 +23,7 @@
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.Nullable;

import javax.management.Query;
import java.io.DataInput;
import java.io.IOException;
import java.math.BigDecimal;
Expand All @@ -44,6 +44,7 @@ static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
final Class<?> componentType,
final Chunk<Values> chunk,
final long rowOffset) {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (chunkType) {
case Boolean:
throw new UnsupportedOperationException("Booleans are reinterpreted as bytes");
Expand Down Expand Up @@ -149,18 +150,28 @@ static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == LocalDate.class) {
return IntChunkInputStreamGenerator.<LocalDate>convertWithTransform(chunk.asObjectChunk(),
final long msPerDay = 24 * 60 * 60 * 1000L;
final long minDate = QueryConstants.MIN_LONG / msPerDay;
final long maxDate = QueryConstants.MAX_LONG / msPerDay;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return LongChunkInputStreamGenerator.<LocalDate>convertWithTransform(chunk.asObjectChunk(),
rowOffset, date -> {
if (date == null) {
return QueryConstants.NULL_LONG;
}
final long epochDay = date.toEpochDay();
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
if (epochDay < 0 || epochDay > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Date out of range: " + date);
if (epochDay < minDate || epochDay > maxDate) {
throw new IllegalArgumentException("Date out of range: " + date + " (" + epochDay
+ " not in [" + minDate + ", " + maxDate + "])");
}
return (int) epochDay;
return epochDay * msPerDay;
});
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.<LocalTime>convertWithTransform(chunk.asObjectChunk(),
rowOffset, date -> {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
if (date == null) {
return QueryConstants.NULL_LONG;
}
final long nanoOfDay = date.toNanoOfDay();
if (nanoOfDay < 0) {
throw new IllegalArgumentException("Time out of range: " + date);
Expand Down Expand Up @@ -197,6 +208,7 @@ static WritableChunk<Values> extractChunkFromInputStream(
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk, final int outOffset, final int totalRows) throws IOException {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (chunkType) {
case Boolean:
throw new UnsupportedOperationException("Booleans are reinterpreted as bytes");
Expand Down Expand Up @@ -323,15 +335,16 @@ static WritableChunk<Values> extractChunkFromInputStream(
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == LocalDate.class) {
return IntChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform(
Integer.BYTES, options,
LocalDate::ofEpochDay,
final long msPerDay = 24 * 60 * 60 * 1000L;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform(
Long.BYTES, options,
value -> value == QueryConstants.NULL_LONG ? null : LocalDate.ofEpochDay(value / msPerDay),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform(
Long.BYTES, options,
LocalTime::ofNanoOfDay,
value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == String.class ||
Expand Down
Loading
Loading