Skip to content

Commit

Permalink
refactor!: Extract factory interface for ChunkInputStreamGenerators (#…
Browse files Browse the repository at this point in the history
…5811)

Rather than relying on a static method to create CISG instances,
extracts a factory so that the JS client can provide its own
implementation of how CISG instances should be built.

Also cleans up ChunkReader/ChunkInputStreamGenerator split a bit more,
since for now JS can only use the former.

BREAKING CHANGES: Removes
ChunkInputStreamGenerator.makeInputStreamGenerator, use the default
factory instance instead.
Partial #188
  • Loading branch information
niloc132 authored Aug 2, 2024
1 parent beecac5 commit 974f9fb
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.deephaven.engine.rowset.impl.ExternalizableRowSetUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.DefaultChunkInputStreamGeneratorFactory;
import io.deephaven.extensions.barrage.chunk.SingleElementListHeaderInputStreamGenerator;
import io.deephaven.extensions.barrage.util.ExposedByteArrayOutputStream;
import io.deephaven.extensions.barrage.util.BarrageUtil;
Expand Down Expand Up @@ -122,9 +123,10 @@ public static class ModColumnGenerator implements SafeCloseable {
private final RowSetGenerator rowsModified;
private final ChunkListInputStreamGenerator data;

ModColumnGenerator(final BarrageMessage.ModColumnData col) throws IOException {
ModColumnGenerator(ChunkInputStreamGenerator.Factory factory, final BarrageMessage.ModColumnData col)
throws IOException {
rowsModified = new RowSetGenerator(col.rowsModified);
data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType);
data = new ChunkListInputStreamGenerator(factory, col.type, col.componentType, col.data, col.chunkType);
}

@Override
Expand Down Expand Up @@ -173,13 +175,15 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message,
addColumnData = new ChunkListInputStreamGenerator[message.addColumnData.length];
for (int i = 0; i < message.addColumnData.length; ++i) {
BarrageMessage.AddColumnData columnData = message.addColumnData[i];
addColumnData[i] = new ChunkListInputStreamGenerator(columnData.type, columnData.componentType,
addColumnData[i] = new ChunkListInputStreamGenerator(DefaultChunkInputStreamGeneratorFactory.INSTANCE,
columnData.type, columnData.componentType,
columnData.data, columnData.chunkType);
}

modColumnData = new ModColumnGenerator[message.modColumnData.length];
for (int i = 0; i < modColumnData.length; ++i) {
modColumnData[i] = new ModColumnGenerator(message.modColumnData[i]);
modColumnData[i] = new ModColumnGenerator(DefaultChunkInputStreamGeneratorFactory.INSTANCE,
message.modColumnData[i]);
}
} catch (final IOException e) {
throw new UncheckedDeephavenException("unexpected IOException while creating barrage message stream", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@ public class ChunkListInputStreamGenerator implements SafeCloseable {
private final List<ChunkInputStreamGenerator> generators;
private final ChunkInputStreamGenerator emptyGenerator;

public ChunkListInputStreamGenerator(Class<?> type, Class<?> componentType, List<Chunk<Values>> data,
public ChunkListInputStreamGenerator(ChunkInputStreamGenerator.Factory factory, Class<?> type,
Class<?> componentType, List<Chunk<Values>> data,
ChunkType chunkType) {
// create an input stream generator for each chunk
ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()];

long rowOffset = 0;
for (int i = 0; i < data.size(); ++i) {
final Chunk<Values> valuesChunk = data.get(i);
generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType,
generators[i] = factory.makeInputStreamGenerator(chunkType, type, componentType,
valuesChunk, rowOffset);
rowOffset += valuesChunk.size();
}
this.generators = Arrays.asList(generators);
emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator(
emptyGenerator = factory.makeInputStreamGenerator(
chunkType, type, componentType, chunkType.getEmptyChunk(), 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.util.pools.PoolableChunk;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,181 +3,47 @@
//
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.util.SafeCloseable;
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;

public interface ChunkInputStreamGenerator extends SafeCloseable {
long MS_PER_DAY = 24 * 60 * 60 * 1000L;
long MIN_LOCAL_DATE_VALUE = QueryConstants.MIN_LONG / MS_PER_DAY;
long MAX_LOCAL_DATE_VALUE = QueryConstants.MAX_LONG / MS_PER_DAY;

static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
final ChunkType chunkType,
final Class<T> type,
final Class<?> componentType,
final Chunk<Values> chunk,
final long rowOffset) {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (chunkType) {
case Boolean:
throw new UnsupportedOperationException("Booleans are reinterpreted as bytes");
case Char:
return new CharChunkInputStreamGenerator(chunk.asCharChunk(), Character.BYTES, rowOffset);
case Byte:
if (type == Boolean.class || type == boolean.class) {
// internally we represent booleans as bytes, but the wire format respects arrow's specification
return new BooleanChunkInputStreamGenerator(chunk.asByteChunk(), rowOffset);
}
return new ByteChunkInputStreamGenerator(chunk.asByteChunk(), Byte.BYTES, rowOffset);
case Short:
return new ShortChunkInputStreamGenerator(chunk.asShortChunk(), Short.BYTES, rowOffset);
case Int:
return new IntChunkInputStreamGenerator(chunk.asIntChunk(), Integer.BYTES, rowOffset);
case Long:
return new LongChunkInputStreamGenerator(chunk.asLongChunk(), Long.BYTES, rowOffset);
case Float:
return new FloatChunkInputStreamGenerator(chunk.asFloatChunk(), Float.BYTES, rowOffset);
case Double:
return new DoubleChunkInputStreamGenerator(chunk.asDoubleChunk(), Double.BYTES, rowOffset);
case Object:
if (type.isArray()) {
if (componentType == byte.class) {
return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write((byte[]) item));
} else {
return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk(), rowOffset);
}
}
if (Vector.class.isAssignableFrom(type)) {
// noinspection unchecked
return new VectorChunkInputStreamGenerator(
(Class<Vector<?>>) type, componentType, chunk.asObjectChunk(), rowOffset);
}
if (type == String.class) {
return new VarBinaryChunkInputStreamGenerator<String>(chunk.asObjectChunk(), rowOffset,
(out, str) -> out.write(str.getBytes(Charsets.UTF_8)));
}
if (type == BigInteger.class) {
return new VarBinaryChunkInputStreamGenerator<BigInteger>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toByteArray()));
}
if (type == BigDecimal.class) {
return new VarBinaryChunkInputStreamGenerator<BigDecimal>(chunk.asObjectChunk(), rowOffset,
(out, item) -> {
final BigDecimal normal = item.stripTrailingZeros();
final int v = normal.scale();
// Write as little endian, arrow endianness.
out.write(0xFF & v);
out.write(0xFF & (v >> 8));
out.write(0xFF & (v >> 16));
out.write(0xFF & (v >> 24));
out.write(normal.unscaledValue().toByteArray());
});
}
if (type == Instant.class) {
// This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted.
ObjectChunk<Instant, Values> objChunk = chunk.asObjectChunk();
WritableLongChunk<Values> outChunk = WritableLongChunk.makeWritableChunk(objChunk.size());
for (int i = 0; i < objChunk.size(); ++i) {
outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i)));
}
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset);
}
if (type == ZonedDateTime.class) {
// This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted.
ObjectChunk<ZonedDateTime, Values> objChunk = chunk.asObjectChunk();
WritableLongChunk<Values> outChunk = WritableLongChunk.makeWritableChunk(objChunk.size());
for (int i = 0; i < objChunk.size(); ++i) {
outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i)));
}
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset);
}
if (type == Boolean.class) {
return BooleanChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Byte.class) {
return ByteChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Character.class) {
return CharChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Double.class) {
return DoubleChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Float.class) {
return FloatChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Integer.class) {
return IntChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Long.class) {
return LongChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Short.class) {
return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == LocalDate.class) {
return LongChunkInputStreamGenerator.<LocalDate>convertWithTransform(chunk.asObjectChunk(),
rowOffset, date -> {
if (date == null) {
return QueryConstants.NULL_LONG;
}
final long epochDay = date.toEpochDay();
if (epochDay < MIN_LOCAL_DATE_VALUE || epochDay > MAX_LOCAL_DATE_VALUE) {
throw new IllegalArgumentException("Date out of range: " + date + " (" + epochDay
+ " not in [" + MIN_LOCAL_DATE_VALUE + ", " + MAX_LOCAL_DATE_VALUE + "])");
}
return epochDay * MS_PER_DAY;
});
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.<LocalTime>convertWithTransform(chunk.asObjectChunk(),
rowOffset, time -> {
if (time == null) {
return QueryConstants.NULL_LONG;
}
final long nanoOfDay = time.toNanoOfDay();
if (nanoOfDay < 0) {
throw new IllegalArgumentException("Time out of range: " + time);
}
return nanoOfDay;
});
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8)));
default:
throw new UnsupportedOperationException();
}
/**
* Creator of {@link ChunkInputStreamGenerator} instances.
* <p>
* This API may not be stable, while the JS API's usages of it are implemented.
*/
interface Factory {
/**
* Returns an instance capable of writing the given chunk
*
* @param chunkType the type of the chunk to be written
* @param type the Java type of the column being written
* @param componentType the Java type of data in an array/vector, or null if irrelevant
* @param chunk the chunk that will be written out to an input stream
* @param rowOffset the offset into the chunk to start writing from
* @return an instance capable of serializing the given chunk
* @param <T> the type of data in the column
*/
<T> ChunkInputStreamGenerator makeInputStreamGenerator(
final ChunkType chunkType,
final Class<T> type,
final Class<?> componentType,
final Chunk<Values> chunk,
final long rowOffset);
}

/**
Expand Down
Loading

0 comments on commit 974f9fb

Please sign in to comment.