diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java index 33300a800b3..0a1e0f94046 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkWriter.java @@ -124,14 +124,17 @@ public int available() throws IOException { } /** - * There are two cases we don't send a validity buffer: - the simplest case is following the arrow flight spec, - * which says that if there are no nulls present, the buffer is optional. - Our implementation of nullCount() - * for primitive types will return zero if the useDeephavenNulls flag is set, so the buffer will also be omitted - * in that case. The client's marshaller does not need to be aware of deephaven nulls but in this mode we assume - * the consumer understands which value is the assigned NULL. + * @formatter:off + * There are two cases we don't send a validity buffer: + * - the simplest case is following the arrow flight spec, which says that if there are no nulls present, the + * buffer is optional. + * - Our implementation of nullCount() for primitive types will return zero if the useDeephavenNulls flag is + * set, so the buffer will also be omitted in that case. The client's marshaller does not need to be aware of + * deephaven nulls but in this mode we assume the consumer understands which value is the assigned NULL. + * @formatter:on */ protected boolean sendValidityBuffer() { - return !fieldNullable || nullCount() != 0; + return fieldNullable && nullCount() != 0; } @Override diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java index 2c289e4e859..a2887ddda6d 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BooleanChunkReader.java @@ -3,9 +3,11 @@ // package io.deephaven.extensions.barrage.chunk; +import io.deephaven.base.verify.Assert; import io.deephaven.chunk.WritableByteChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.util.BooleanUtils; import io.deephaven.util.datastructures.LongSizedDataStructure; @@ -16,6 +18,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.PrimitiveIterator; +import java.util.function.Function; import static io.deephaven.extensions.barrage.chunk.BaseChunkWriter.getNumLongsForBitPackOfSize; @@ -39,6 +42,32 @@ public BooleanChunkReader(ByteConversion conversion) { this.conversion = conversion; } + public ChunkReader> transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableByteChunk inner = BooleanChunkReader.this.readChunk( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + byte value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + @Override public WritableByteChunk readChunk( @NotNull final Iterator fieldNodeIter, @@ -99,7 +128,6 @@ public WritableByteChunk readChunk( return chunk; } - private static void useValidityBuffer( final ByteConversion conversion, final DataInput is, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java index 5133eaaff4e..8e6f24d39f5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java @@ -84,7 +84,6 @@ public ChunkReader> transform(Function> extends BaseChunkWriter { private static final String DEBUG_NAME = "ByteChunkWriter"; private static final ByteChunkWriter> NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>( - ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, false); - private static final ByteChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>( ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, true); + private static final ByteChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new ByteChunkWriter<>( + ByteChunk::isNull, ByteChunk::getEmptyChunk, ByteChunk::get, false); public static ByteChunkWriter> getIdentity(boolean isNullable) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java index f95f39a2ed5..e51e02b3c98 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java @@ -80,7 +80,6 @@ public ChunkReader> transform(Function> extends BaseChunkWriter { private static final String DEBUG_NAME = "CharChunkWriter"; private static final CharChunkWriter> NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>( - CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, false); - private static final CharChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>( CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, true); + private static final CharChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new CharChunkWriter<>( + CharChunk::isNull, CharChunk::getEmptyChunk, CharChunk::get, false); public static CharChunkWriter> getIdentity(boolean isNullable) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java index 9fe71ffdd0f..f17f64183df 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java @@ -25,6 +25,7 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.BooleanUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.type.TypeUtils; import io.deephaven.vector.Vector; @@ -275,7 +276,17 @@ public > ChunkReader newReaderPojo( if (typeId == ArrowType.ArrowTypeID.Union) { final ArrowType.Union unionType = (ArrowType.Union) field.getType(); - final List>> innerReaders = new ArrayList<>(); + final List>> innerReaders = new ArrayList<>(); + + for (int ii = 0; ii < field.getChildren().size(); ++ii) { + final Field childField = field.getChildren().get(ii); + final BarrageTypeInfo childTypeInfo = BarrageUtil.getDefaultType(childField); + ChunkReader> childReader = newReaderPojo(childTypeInfo, options, false); + if (childField.getType().getTypeID() == ArrowType.ArrowTypeID.Bool) { + childReader = ((BooleanChunkReader) childReader).transform(BooleanUtils::byteAsBoolean); + } + innerReaders.add(childReader); + } // noinspection unchecked return (ChunkReader) new UnionChunkReader( diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java index 60341a7df74..60dbe84efe8 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkWriter.java @@ -25,9 +25,9 @@ public class DoubleChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "DoubleChunkWriter"; private static final DoubleChunkWriter> NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>( - DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, false); - private static final DoubleChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>( DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, true); + private static final DoubleChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new DoubleChunkWriter<>( + DoubleChunk::isNull, DoubleChunk::getEmptyChunk, DoubleChunk::get, false); public static DoubleChunkWriter> getIdentity(boolean isNullable) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java index de0ec413fff..7832beb98cf 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkWriter.java @@ -25,9 +25,9 @@ public class FloatChunkWriter> extends BaseChunkWriter { private static final String DEBUG_NAME = "FloatChunkWriter"; private static final FloatChunkWriter> NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>( - FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, false); - private static final FloatChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>( FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, true); + private static final FloatChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new FloatChunkWriter<>( + FloatChunk::isNull, FloatChunk::getEmptyChunk, FloatChunk::get, false); public static FloatChunkWriter> getIdentity(boolean isNullable) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java index d72bf2c3b3e..8ec0d0ddc29 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java @@ -84,7 +84,6 @@ public ChunkReader> transform(Function> extends BaseChunkWriter { private static final String DEBUG_NAME = "IntChunkWriter"; private static final IntChunkWriter> NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>( - IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, false); - private static final IntChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>( IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, true); + private static final IntChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new IntChunkWriter<>( + IntChunk::isNull, IntChunk::getEmptyChunk, IntChunk::get, false); public static IntChunkWriter> getIdentity(boolean isNullable) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java index a179c43e60a..8663d530da9 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java @@ -84,7 +84,6 @@ public ChunkReader> transform(Function> extends BaseChunkWriter { private static final String DEBUG_NAME = "LongChunkWriter"; private static final LongChunkWriter> NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>( - LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, false); - private static final LongChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>( LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, true); + private static final LongChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new LongChunkWriter<>( + LongChunk::isNull, LongChunk::getEmptyChunk, LongChunk::get, false); public static LongChunkWriter> getIdentity(boolean isNullable) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java index 8fb615789c8..6016025ecde 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java @@ -84,7 +84,6 @@ public ChunkReader> transform(Function> extends BaseChunkWriter { private static final String DEBUG_NAME = "ShortChunkWriter"; private static final ShortChunkWriter> NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>( - ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, false); - private static final ShortChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>( ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, true); + private static final ShortChunkWriter> NON_NULLABLE_IDENTITY_INSTANCE = new ShortChunkWriter<>( + ShortChunk::isNull, ShortChunk::getEmptyChunk, ShortChunk::get, false); public static ShortChunkWriter> getIdentity(boolean isNullable) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java index 931894da676..6689e9e45ac 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java @@ -63,6 +63,7 @@ public OutputChunkType readChunk( for (int ii = 0; ii < wireValues.size(); ++ii) { transformFunction.apply(wireValues, chunk, ii, outOffset + ii); } + chunk.setSize(outOffset + wireValues.size()); return chunk; } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java index 294ea850892..e11f28d2abe 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/UnionChunkReader.java @@ -37,11 +37,11 @@ public static Mode mode(UnionMode mode) { private static final String DEBUG_NAME = "UnionChunkReader"; private final Mode mode; - private final List>> readers; + private final List>> readers; public UnionChunkReader( final Mode mode, - final List>> readers) { + final List>> readers) { this.mode = mode; this.readers = readers; // the specification doesn't allow the union column to have more than signed byte number of types @@ -65,7 +65,7 @@ public WritableObjectChunk readChunk( int numRows = nodeInfo.numElements; if (numRows == 0) { is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, coiBufferLength + offsetsBufferLength)); - for (final ChunkReader> reader : readers) { + for (final ChunkReader> reader : readers) { // noinspection EmptyTryBlock try (final SafeCloseable ignored = reader.readChunk(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { // do nothing; we need each reader to consume fieldNodeIter and bufferInfoIter diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index c153155f830..7ebce9f43d6 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -460,6 +460,7 @@ public static BarrageTable make( return value instanceof Boolean && (Boolean) value; }; + schema.attributes.put(Table.BARRAGE_SCHEMA_ATTRIBUTE, schema.arrowSchema); if (getAttribute.test(Table.BLINK_TABLE_ATTRIBUTE)) { final LinkedHashMap> finalColumns = makeColumns(schema, writableSources); table = new BarrageBlinkTable( diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java index 81edb2241ee..952e27e36d0 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageMessageReaderImpl.java @@ -265,11 +265,12 @@ public BarrageMessage safelyParseFrom(final BarrageOptions options, } // fill the chunk with data and assign back into the array - chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, chunk.size(), + final int origSize = chunk.size(); + chunk = readers.get(ci).readChunk(fieldNodeIter, bufferInfoIter, ois, chunk, origSize, (int) batch.length()); acd.data.set(lastChunkIndex, chunk); if (!options.columnsAsList()) { - chunk.setSize(chunk.size() + (int) batch.length()); + chunk.setSize(origSize + (int) batch.length()); } }