Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Encoding for LocalDate and LocalTime #5446

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/barrage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':Base')
implementation project(':Util')
implementation project(':Configuration')
implementation project(':util-function')
implementation depCommonsLang3

api project(':engine-table')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import com.google.common.io.LittleEndianDataOutputStream;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToByteFunction;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.WritableByteChunk;
Expand All @@ -27,6 +30,7 @@
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -47,6 +51,20 @@ public static ByteChunkInputStreamGenerator convertBoxed(
return new ByteChunkInputStreamGenerator(outChunk, Byte.BYTES, rowOffset);
}

public static <T> ByteChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToByteFunction<T> transform) {
// This code path is utilized for LocalDate and LocalTime
WritableByteChunk<Values> outChunk = WritableByteChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
T value = inChunk.get(i);
outChunk.set(i, value == null ? QueryConstants.NULL_BYTE : transform.applyAsByte(value));
}
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new ByteChunkInputStreamGenerator(outChunk, Byte.BYTES, rowOffset);
}

ByteChunkInputStreamGenerator(final ByteChunk<Values> chunk, final int elementSize, final long rowOffset) {
super(chunk, elementSize, rowOffset);
}
Expand Down Expand Up @@ -177,6 +195,38 @@ static WritableChunk<Values> extractChunkFromInputStream(
totalRows);
}

static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final int elementSize,
final StreamReaderOptions options,
final Function<Byte, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableByteChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0).asWritableByteChunk()) {

final WritableObjectChunk<T, Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableObjectChunk();
} else {
final int numRows = Math.max(totalRows, inner.size());
chunk = WritableObjectChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

for (int ii = 0; ii < inner.size(); ++ii) {
byte value = inner.get(ii);
chunk.set(outOffset + ii, value == NULL_BYTE ? null : transform.apply(value));
}

return chunk;
}
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import com.google.common.io.LittleEndianDataOutputStream;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToCharFunction;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.WritableCharChunk;
Expand All @@ -23,6 +26,7 @@
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -43,6 +47,20 @@ public static CharChunkInputStreamGenerator convertBoxed(
return new CharChunkInputStreamGenerator(outChunk, Character.BYTES, rowOffset);
}

public static <T> CharChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToCharFunction<T> transform) {
// This code path is utilized for LocalDate and LocalTime
WritableCharChunk<Values> outChunk = WritableCharChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
T value = inChunk.get(i);
outChunk.set(i, value == null ? QueryConstants.NULL_CHAR : transform.applyAsChar(value));
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
return new CharChunkInputStreamGenerator(outChunk, Character.BYTES, rowOffset);
}

CharChunkInputStreamGenerator(final CharChunk<Values> chunk, final int elementSize, final long rowOffset) {
super(chunk, elementSize, rowOffset);
}
Expand Down Expand Up @@ -173,6 +191,38 @@ static WritableChunk<Values> extractChunkFromInputStream(
totalRows);
}

static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final int elementSize,
final StreamReaderOptions options,
final Function<Character, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableCharChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0).asWritableCharChunk()) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved

final WritableObjectChunk<T, Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableObjectChunk();
} else {
final int numRows = Math.max(totalRows, inner.size());
chunk = WritableObjectChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved

for (int ii = 0; ii < inner.size(); ++ii) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
char value = inner.get(ii);
chunk.set(outOffset + ii, value == NULL_CHAR ? null : transform.apply(value));
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
}

return chunk;
}
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.LongChunk;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -27,6 +29,8 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Iterator;
Expand Down Expand Up @@ -144,6 +148,26 @@ static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
if (type == Short.class) {
return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == LocalDate.class) {
return IntChunkInputStreamGenerator.<LocalDate>convertWithTransform(chunk.asObjectChunk(),
rowOffset, date -> {
final long epochDay = date.toEpochDay();
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
if (epochDay < 0 || epochDay > Integer.MAX_VALUE) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("Date out of range: " + date);
}
return (int) epochDay;
});
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.<LocalTime>convertWithTransform(chunk.asObjectChunk(),
rowOffset, date -> {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
final long nanoOfDay = date.toNanoOfDay();
if (nanoOfDay < 0) {
throw new IllegalArgumentException("Time out of range: " + date);
}
return nanoOfDay;
});
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
Expand Down Expand Up @@ -298,6 +322,18 @@ static WritableChunk<Values> extractChunkFromInputStream(
Short.BYTES, options, io -> TypeUtils.box(io.readShort()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == LocalDate.class) {
return IntChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform(
Integer.BYTES, options,
LocalDate::ofEpochDay,
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform(
Long.BYTES, options,
LocalTime::ofNanoOfDay,
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == String.class ||
options.columnConversionMode().equals(ColumnConversionMode.Stringify)) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, fieldNodeIter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import com.google.common.io.LittleEndianDataOutputStream;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToDoubleFunction;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.DoubleChunk;
import io.deephaven.chunk.WritableDoubleChunk;
Expand All @@ -27,6 +30,7 @@
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -47,6 +51,20 @@ public static DoubleChunkInputStreamGenerator convertBoxed(
return new DoubleChunkInputStreamGenerator(outChunk, Double.BYTES, rowOffset);
}

public static <T> DoubleChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToDoubleFunction<T> transform) {
// This code path is utilized for LocalDate and LocalTime
WritableDoubleChunk<Values> outChunk = WritableDoubleChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
T value = inChunk.get(i);
outChunk.set(i, value == null ? QueryConstants.NULL_DOUBLE : transform.applyAsDouble(value));
}
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new DoubleChunkInputStreamGenerator(outChunk, Double.BYTES, rowOffset);
}

DoubleChunkInputStreamGenerator(final DoubleChunk<Values> chunk, final int elementSize, final long rowOffset) {
super(chunk, elementSize, rowOffset);
}
Expand Down Expand Up @@ -177,6 +195,38 @@ static WritableChunk<Values> extractChunkFromInputStream(
totalRows);
}

static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final int elementSize,
final StreamReaderOptions options,
final Function<Double, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableDoubleChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0).asWritableDoubleChunk()) {

final WritableObjectChunk<T, Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableObjectChunk();
} else {
final int numRows = Math.max(totalRows, inner.size());
chunk = WritableObjectChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

for (int ii = 0; ii < inner.size(); ++ii) {
double value = inner.get(ii);
chunk.set(outOffset + ii, value == NULL_DOUBLE ? null : transform.apply(value));
}

return chunk;
}
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
Expand Down
Loading
Loading