From ff87bb2d732e24726f4e0f00fb03d53437b359a1 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 24 Apr 2024 06:12:52 -0500 Subject: [PATCH 01/13] Move ChunkListInputStreamGenerator to top-level class --- .../barrage/BarrageStreamGeneratorImpl.java | 89 +++++-------------- .../ChunkListInputStreamGenerator.java | 56 ++++++++++++ 2 files changed, 79 insertions(+), 66 deletions(-) create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index c62ef1020c3..1fa9684d61b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -14,7 +14,6 @@ import io.deephaven.barrage.flatbuf.BarrageMessageWrapper; import io.deephaven.barrage.flatbuf.BarrageModColumnMetadata; import io.deephaven.barrage.flatbuf.BarrageUpdateMetadata; -import io.deephaven.chunk.Chunk; import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableLongChunk; @@ -130,57 +129,13 @@ protected void writeHeader( } } - public static class ChunkListInputStreamGenerator implements SafeCloseable { - public ChunkInputStreamGenerator[] generators; - public ChunkInputStreamGenerator emptyGenerator; - - ChunkListInputStreamGenerator(BarrageMessage.AddColumnData acd) { - // create an input stream generator for each chunk - generators = new ChunkInputStreamGenerator[acd.data.size()]; - - long rowOffset = 0; - for (int i = 0; i < acd.data.size(); ++i) { - final Chunk valuesChunk = acd.data.get(i); - generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator( - valuesChunk.getChunkType(), acd.type, acd.componentType, valuesChunk, rowOffset); - rowOffset += valuesChunk.size(); - } - emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( - acd.chunkType, acd.type, acd.componentType, acd.chunkType.getEmptyChunk(), 0); - } - - ChunkListInputStreamGenerator(BarrageMessage.ModColumnData mcd) { - // create an input stream generator for each chunk - generators = new ChunkInputStreamGenerator[mcd.data.size()]; - - long rowOffset = 0; - for (int i = 0; i < mcd.data.size(); ++i) { - final Chunk valuesChunk = mcd.data.get(i); - generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator( - mcd.chunkType, mcd.type, mcd.componentType, valuesChunk, rowOffset); - rowOffset += valuesChunk.size(); - } - emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( - mcd.chunkType, mcd.type, mcd.componentType, mcd.chunkType.getEmptyChunk(), 0); - } - - @Override - public void close() { - for (int i = 0; i < generators.length; i++) { - generators[i].close(); - generators[i] = null; - } - emptyGenerator.close(); - } - } - public static class ModColumnData { public final RowSetGenerator rowsModified; public final ChunkListInputStreamGenerator data; ModColumnData(final BarrageMessage.ModColumnData col) throws IOException { rowsModified = new RowSetGenerator(col.rowsModified); - data = new ChunkListInputStreamGenerator(col); + data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType); } } @@ -225,8 +180,10 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message, addColumnData = new ChunkListInputStreamGenerator[message.addColumnData.length]; for (int i = 0; i < message.addColumnData.length; ++i) { - addColumnData[i] = new ChunkListInputStreamGenerator(message.addColumnData[i]); - addGeneratorCount = Math.max(addGeneratorCount, addColumnData[i].generators.length); + BarrageMessage.AddColumnData columnData = message.addColumnData[i]; + addColumnData[i] = new ChunkListInputStreamGenerator(columnData.type, columnData.componentType, + columnData.data, columnData.chunkType); + addGeneratorCount = Math.max(addGeneratorCount, addColumnData[i].generators().size()); } modColumnData = new ModColumnData[message.modColumnData.length]; @@ -827,18 +784,18 @@ private void processBatches(Consumer visitor, final View view, } } - private static int findGeneratorForOffset(final ChunkInputStreamGenerator[] generators, final long offset) { + private static int findGeneratorForOffset(final List generators, final long offset) { // fast path for smaller updates - if (generators.length <= 1) { + if (generators.isEmpty()) { return 0; } int low = 0; - int high = generators.length; + int high = generators.size(); while (low + 1 < high) { int mid = (low + high) / 2; - int cmp = Long.compare(generators[mid].getRowOffset(), offset); + int cmp = Long.compare(generators.get(mid).getRowOffset(), offset); if (cmp < 0) { // the generator's first key is low enough @@ -865,7 +822,7 @@ private int appendAddColumns(final View view, final long startRange, final int t // find the generator for the initial position-space key long startPos = view.addRowOffsets().get(startRange); - int chunkIdx = findGeneratorForOffset(addColumnData[0].generators, startPos); + int chunkIdx = findGeneratorForOffset(addColumnData[0].generators(), startPos); // adjust the batch size if we would cross a chunk boundary long shift = 0; @@ -873,8 +830,8 @@ private int appendAddColumns(final View view, final long startRange, final int t if (endPos == RowSet.NULL_ROW_KEY) { endPos = Long.MAX_VALUE; } - if (addColumnData[0].generators.length > 0) { - final ChunkInputStreamGenerator tmpGenerator = addColumnData[0].generators[chunkIdx]; + if (!addColumnData[0].generators().isEmpty()) { + final ChunkInputStreamGenerator tmpGenerator = addColumnData[0].generators().get(chunkIdx); endPos = Math.min(endPos, tmpGenerator.getLastRowOffset()); shift = -tmpGenerator.getRowOffset(); } @@ -885,7 +842,7 @@ private int appendAddColumns(final View view, final long startRange, final int t final RowSet adjustedOffsets = shift == 0 ? null : myAddedOffsets.shift(shift)) { // every column must write to the stream for (final ChunkListInputStreamGenerator data : addColumnData) { - final int numElements = data.generators.length == 0 + final int numElements = data.generators().isEmpty() ? 0 : myAddedOffsets.intSize("BarrageStreamGenerator"); if (view.options().columnsAsList()) { @@ -901,7 +858,7 @@ private int appendAddColumns(final View view, final long startRange, final int t // use an empty generator to publish the column data try (final RowSet empty = RowSetFactory.empty()) { final ChunkInputStreamGenerator.DrainableColumn drainableColumn = - data.emptyGenerator.getInputStream(view.options(), empty); + data.empty(view.options(), empty); drainableColumn.visitFieldNodes(fieldNodeListener); drainableColumn.visitBuffers(bufferListener); @@ -909,7 +866,7 @@ private int appendAddColumns(final View view, final long startRange, final int t addStream.accept(drainableColumn); } } else { - final ChunkInputStreamGenerator generator = data.generators[chunkIdx]; + final ChunkInputStreamGenerator generator = data.generators().get(chunkIdx); final ChunkInputStreamGenerator.DrainableColumn drainableColumn = generator.getInputStream(view.options(), shift == 0 ? myAddedOffsets : adjustedOffsets); drainableColumn.visitFieldNodes(fieldNodeListener); @@ -934,8 +891,8 @@ private int appendModColumns(final View view, final long startRange, final int t // adjust the batch size if we would cross a chunk boundary for (int ii = 0; ii < modColumnData.length; ++ii) { final ModColumnData mcd = modColumnData[ii]; - final ChunkInputStreamGenerator[] generators = mcd.data.generators; - if (generators.length == 0) { + final List generators = mcd.data.generators(); + if (generators.isEmpty()) { continue; } @@ -944,8 +901,8 @@ private int appendModColumns(final View view, final long startRange, final int t final long startPos = modOffsets != null ? modOffsets.get(startRange) : startRange; if (startPos != RowSet.NULL_ROW_KEY) { final int chunkIdx = findGeneratorForOffset(generators, startPos); - if (chunkIdx < generators.length - 1) { - maxLength = Math.min(maxLength, generators[chunkIdx].getLastRowOffset() + 1 - startPos); + if (chunkIdx < generators.size() - 1) { + maxLength = Math.min(maxLength, generators.get(chunkIdx).getLastRowOffset() + 1 - startPos); } columnChunkIdx[ii] = chunkIdx; } @@ -955,9 +912,9 @@ private int appendModColumns(final View view, final long startRange, final int t long numRows = 0; for (int ii = 0; ii < modColumnData.length; ++ii) { final ModColumnData mcd = modColumnData[ii]; - final ChunkInputStreamGenerator generator = mcd.data.generators.length > 0 - ? mcd.data.generators[columnChunkIdx[ii]] - : null; + final ChunkInputStreamGenerator generator = mcd.data.generators().isEmpty() + ? null + : mcd.data.generators().get(columnChunkIdx[ii]); final RowSet modOffsets = view.modRowOffsets(ii); long startPos, endPos; @@ -1005,7 +962,7 @@ private int appendModColumns(final View view, final long startRange, final int t // use the empty generator to publish the column data try (final RowSet empty = RowSetFactory.empty()) { final ChunkInputStreamGenerator.DrainableColumn drainableColumn = - mcd.data.emptyGenerator.getInputStream(view.options(), empty); + mcd.data.empty(view.options(), empty); drainableColumn.visitFieldNodes(fieldNodeListener); drainableColumn.visitBuffers(bufferListener); // Add the drainable last as it is allowed to immediately close a row set the visitors need diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java new file mode 100644 index 00000000000..7aeb20c8b9f --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java @@ -0,0 +1,56 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.chunk.Chunk; +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.table.impl.util.BarrageMessage; +import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.SafeCloseable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class ChunkListInputStreamGenerator implements SafeCloseable { + private final List generators; + private final ChunkInputStreamGenerator emptyGenerator; + + public ChunkListInputStreamGenerator(Class type, Class componentType, List> data, + ChunkType chunkType) { + // create an input stream generator for each chunk + ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()]; + + long rowOffset = 0; + for (int i = 0; i < data.size(); ++i) { + final Chunk valuesChunk = data.get(i); + generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType, + valuesChunk, rowOffset); + rowOffset += valuesChunk.size(); + } + this.generators = Arrays.asList(generators); + emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( + chunkType, type, componentType, chunkType.getEmptyChunk(), 0); + } + + public List generators() { + return generators; + } + + public ChunkInputStreamGenerator.DrainableColumn empty(StreamReaderOptions options, RowSet rowSet) + throws IOException { + return emptyGenerator.getInputStream(options, rowSet); + } + + @Override + public void close() { + for (ChunkInputStreamGenerator generator : generators) { + generator.close(); + } + emptyGenerator.close(); + } +} From 91fba5fb24129a7518859f6292e539393ae90fc8 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 25 Apr 2024 10:43:18 -0500 Subject: [PATCH 02/13] Make fields private, remove unused ones --- .../barrage/BarrageStreamGeneratorImpl.java | 88 ++++++++----------- 1 file changed, 39 insertions(+), 49 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 1fa9684d61b..49faede6516 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -130,8 +130,8 @@ protected void writeHeader( } public static class ModColumnData { - public final RowSetGenerator rowsModified; - public final ChunkListInputStreamGenerator data; + private final RowSetGenerator rowsModified; + private final ChunkListInputStreamGenerator data; ModColumnData(final BarrageMessage.ModColumnData col) throws IOException { rowsModified = new RowSetGenerator(col.rowsModified); @@ -139,23 +139,21 @@ public static class ModColumnData { } } - public final BarrageMessage message; - public final BarragePerformanceLog.WriteMetricsConsumer writeConsumer; + private final BarrageMessage message; + private final BarragePerformanceLog.WriteMetricsConsumer writeConsumer; - public final long firstSeq; - public final long lastSeq; - public final long step; + private final long firstSeq; + private final long lastSeq; - public final boolean isSnapshot; + private final boolean isSnapshot; - public final RowSetGenerator rowsAdded; - public final RowSetGenerator rowsIncluded; - public final RowSetGenerator rowsRemoved; - public final RowSetShiftDataGenerator shifted; + private final RowSetGenerator rowsAdded; + private final RowSetGenerator rowsIncluded; + private final RowSetGenerator rowsRemoved; + private final RowSetShiftDataGenerator shifted; - public final ChunkListInputStreamGenerator[] addColumnData; - public int addGeneratorCount = 0; - public final ModColumnData[] modColumnData; + private final ChunkListInputStreamGenerator[] addColumnData; + private final ModColumnData[] modColumnData; /** * Create a barrage stream generator that can slice and dice the barrage message for delivery to clients. @@ -170,7 +168,6 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message, try { firstSeq = message.firstSeq; lastSeq = message.lastSeq; - step = message.step; isSnapshot = message.isSnapshot; rowsAdded = new RowSetGenerator(message.rowsAdded); @@ -183,7 +180,6 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message, BarrageMessage.AddColumnData columnData = message.addColumnData[i]; addColumnData[i] = new ChunkListInputStreamGenerator(columnData.type, columnData.componentType, columnData.data, columnData.chunkType); - addGeneratorCount = Math.max(addGeneratorCount, addColumnData[i].generators().size()); } modColumnData = new ModColumnData[message.modColumnData.length]; @@ -257,19 +253,19 @@ public SubView getSubView(BarrageSubscriptionOptions options, boolean isInitialS return getSubView(options, isInitialSnapshot, null, false, null, null); } - public static class SubView implements View { - public final BarrageStreamGeneratorImpl generator; - public final BarrageSubscriptionOptions options; - public final boolean isInitialSnapshot; - public final RowSet viewport; - public final boolean reverseViewport; - public final RowSet keyspaceViewport; - public final BitSet subscribedColumns; - public final long numAddRows; - public final long numModRows; - public final RowSet addRowOffsets; - public final RowSet addRowKeys; - public final RowSet[] modRowOffsets; + public static final class SubView implements View { + private final BarrageStreamGeneratorImpl generator; + private final BarrageSubscriptionOptions options; + private final boolean isInitialSnapshot; + private final RowSet viewport; + private final boolean reverseViewport; + private final RowSet keyspaceViewport; + private final BitSet subscribedColumns; + private final long numAddRows; + private final long numModRows; + private final RowSet addRowOffsets; + private final RowSet addRowKeys; + private final RowSet[] modRowOffsets; public SubView(final BarrageStreamGeneratorImpl generator, final BarrageSubscriptionOptions options, @@ -430,16 +426,15 @@ public SnapshotView getSnapshotView(BarrageSnapshotOptions options) { return getSnapshotView(options, null, false, null, null); } - public static class SnapshotView implements View { - public final BarrageStreamGeneratorImpl generator; - public final BarrageSnapshotOptions options; - public final RowSet viewport; - public final boolean reverseViewport; - public final RowSet keyspaceViewport; - public final BitSet subscribedColumns; - public final long numAddRows; - public final RowSet addRowKeys; - public final RowSet addRowOffsets; + public static final class SnapshotView implements View { + private final BarrageStreamGeneratorImpl generator; + private final BarrageSnapshotOptions options; + private final RowSet viewport; + private final boolean reverseViewport; + private final BitSet subscribedColumns; + private final long numAddRows; + private final RowSet addRowKeys; + private final RowSet addRowOffsets; public SnapshotView(final BarrageStreamGeneratorImpl generator, final BarrageSnapshotOptions options, @@ -452,7 +447,6 @@ public SnapshotView(final BarrageStreamGeneratorImpl generator, this.viewport = viewport; this.reverseViewport = reverseViewport; - this.keyspaceViewport = keyspaceViewport; this.subscribedColumns = subscribedColumns; // precompute add row offsets @@ -509,7 +503,7 @@ public boolean isViewport() { } @Override - public final StreamReaderOptions options() { + public StreamReaderOptions options() { return options; } @@ -524,8 +518,8 @@ public RowSet modRowOffsets(int col) { } } - public static class SchemaView implements View { - final byte[] msgBytes; + public static final class SchemaView implements View { + private final byte[] msgBytes; public SchemaView(final ByteBuffer buffer) { this.msgBytes = Flight.FlightData.newBuilder() @@ -1127,7 +1121,7 @@ protected int addToFlatBuffer(final FlatBufferBuilder builder) { } public static class RowSetGenerator extends ByteArrayGenerator implements SafeCloseable { - public final RowSet original; + private final RowSet original; public RowSetGenerator(final RowSet rowSet) throws IOException { this.original = rowSet.copy(); @@ -1201,11 +1195,7 @@ public int addToFlatBuffer(final BitSet mine, final FlatBufferBuilder builder) t } public static class RowSetShiftDataGenerator extends ByteArrayGenerator { - public final RowSetShiftData original; - public RowSetShiftDataGenerator(final RowSetShiftData shifted) throws IOException { - this.original = shifted; - final RowSetBuilderSequential sRangeBuilder = RowSetFactory.builderSequential(); final RowSetBuilderSequential eRangeBuilder = RowSetFactory.builderSequential(); final RowSetBuilderSequential destBuilder = RowSetFactory.builderSequential(); From 6be69f07eb5d126382cc7cc70787fdba3d2d1265 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 30 May 2024 16:11:10 -0500 Subject: [PATCH 03/13] Make inner classes non-static and simplify --- .../barrage/BarrageStreamGeneratorImpl.java | 334 +++++++++--------- 1 file changed, 164 insertions(+), 170 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 49faede6516..8c405696da6 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -237,7 +237,7 @@ public SubView getSubView(final BarrageSubscriptionOptions options, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { - return new SubView(this, options, isInitialSnapshot, viewport, reverseViewport, keyspaceViewport, + return new SubView(options, isInitialSnapshot, viewport, reverseViewport, keyspaceViewport, subscribedColumns); } @@ -253,8 +253,7 @@ public SubView getSubView(BarrageSubscriptionOptions options, boolean isInitialS return getSubView(options, isInitialSnapshot, null, false, null, null); } - public static final class SubView implements View { - private final BarrageStreamGeneratorImpl generator; + public final class SubView implements View { private final BarrageSubscriptionOptions options; private final boolean isInitialSnapshot; private final RowSet viewport; @@ -267,14 +266,12 @@ public static final class SubView implements View { private final RowSet addRowKeys; private final RowSet[] modRowOffsets; - public SubView(final BarrageStreamGeneratorImpl generator, - final BarrageSubscriptionOptions options, + public SubView(final BarrageSubscriptionOptions options, final boolean isInitialSnapshot, @Nullable final RowSet viewport, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { - this.generator = generator; this.options = options; this.isInitialSnapshot = isInitialSnapshot; this.viewport = viewport; @@ -283,15 +280,15 @@ public SubView(final BarrageStreamGeneratorImpl generator, this.subscribedColumns = subscribedColumns; if (keyspaceViewport != null) { - this.modRowOffsets = new WritableRowSet[generator.modColumnData.length]; + this.modRowOffsets = new WritableRowSet[modColumnData.length]; } else { this.modRowOffsets = null; } // precompute the modified column indexes, and calculate total rows needed long numModRows = 0; - for (int ii = 0; ii < generator.modColumnData.length; ++ii) { - final ModColumnData mcd = generator.modColumnData[ii]; + for (int ii = 0; ii < modColumnData.length; ++ii) { + final ModColumnData mcd = modColumnData[ii]; if (keyspaceViewport != null) { try (WritableRowSet intersect = keyspaceViewport.intersect(mcd.rowsModified.original)) { @@ -305,15 +302,15 @@ public SubView(final BarrageStreamGeneratorImpl generator, this.numModRows = numModRows; if (keyspaceViewport != null) { - addRowKeys = keyspaceViewport.intersect(generator.rowsIncluded.original); - addRowOffsets = generator.rowsIncluded.original.invert(addRowKeys); - } else if (!generator.rowsAdded.original.equals(generator.rowsIncluded.original)) { + addRowKeys = keyspaceViewport.intersect(rowsIncluded.original); + addRowOffsets = rowsIncluded.original.invert(addRowKeys); + } else if (!rowsAdded.original.equals(rowsIncluded.original)) { // there are scoped rows included in the chunks that need to be removed - addRowKeys = generator.rowsAdded.original.copy(); - addRowOffsets = generator.rowsIncluded.original.invert(addRowKeys); + addRowKeys = rowsAdded.original.copy(); + addRowOffsets = rowsIncluded.original.invert(addRowKeys); } else { - addRowKeys = generator.rowsAdded.original.copy(); - addRowOffsets = RowSetFactory.flat(generator.rowsAdded.original.size()); + addRowKeys = rowsAdded.original.copy(); + addRowOffsets = RowSetFactory.flat(rowsAdded.original.size()); } this.numAddRows = addRowOffsets.size(); @@ -322,7 +319,7 @@ public SubView(final BarrageStreamGeneratorImpl generator, @Override public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); - ByteBuffer metadata = generator.getSubscriptionMetadata(this); + ByteBuffer metadata = getSubscriptionMetadata(); MutableLong bytesWritten = new MutableLong(0L); // batch size is maximum, will write fewer rows when needed @@ -332,21 +329,21 @@ public void forEachStream(Consumer visitor) throws IOException { if (numAddRows == 0 && numModRows == 0) { // we still need to send a message containing metadata when there are no rows - final InputStream is = generator.getInputStream( - this, 0, 0, actualBatchSize, metadata, generator::appendAddColumns); + final InputStream is = getInputStream(this, 0, 0, actualBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns); bytesWritten.add(is.available()); visitor.accept(is); - generator.writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); + writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); return; } // send the add batches (if any) - generator.processBatches(visitor, this, numAddRows, maxBatchSize, metadata, generator::appendAddColumns, - bytesWritten); + processBatches(visitor, this, numAddRows, maxBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns, bytesWritten); // send the mod batches (if any) but don't send metadata twice - generator.processBatches(visitor, this, numModRows, maxBatchSize, numAddRows > 0 ? null : metadata, - generator::appendModColumns, bytesWritten); + processBatches(visitor, this, numModRows, maxBatchSize, numAddRows > 0 ? null : metadata, + BarrageStreamGeneratorImpl.this::appendModColumns, bytesWritten); // clean up the helper indexes addRowOffsets.close(); @@ -356,7 +353,7 @@ public void forEachStream(Consumer visitor) throws IOException { modViewport.close(); } } - generator.writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); + writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); } private int batchSize() { @@ -394,6 +391,84 @@ public RowSet modRowOffsets(int col) { } return modRowOffsets[col]; } + + private ByteBuffer getSubscriptionMetadata() throws IOException { + final FlatBufferBuilder metadata = new FlatBufferBuilder(); + + int effectiveViewportOffset = 0; + if (isSnapshot && isViewport()) { + try (final RowSetGenerator viewportGen = new RowSetGenerator(viewport)) { + effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); + } + } + + int effectiveColumnSetOffset = 0; + if (isSnapshot && subscribedColumns != null) { + effectiveColumnSetOffset = new BitSetGenerator(subscribedColumns).addToFlatBuffer(metadata); + } + + final int rowsAddedOffset; + if (isSnapshot && !isInitialSnapshot) { + // client's don't need/want to receive the full RowSet on every snapshot + rowsAddedOffset = EmptyRowSetGenerator.INSTANCE.addToFlatBuffer(metadata); + } else { + rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); + } + + final int rowsRemovedOffset = rowsRemoved.addToFlatBuffer(metadata); + final int shiftDataOffset = shifted.addToFlatBuffer(metadata); + + // Added Chunk Data: + int addedRowsIncludedOffset = 0; + + // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same + if (isSnapshot || !addRowKeys.equals(rowsAdded.original)) { + addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(addRowKeys, metadata); + } + + // now add mod-column streams, and write the mod column indexes + TIntArrayList modOffsets = new TIntArrayList(modColumnData.length); + for (final ModColumnData mcd : modColumnData) { + final int myModRowOffset; + if (keyspaceViewport != null) { + myModRowOffset = mcd.rowsModified.addToFlatBuffer(keyspaceViewport, metadata); + } else { + myModRowOffset = mcd.rowsModified.addToFlatBuffer(metadata); + } + modOffsets.add(BarrageModColumnMetadata.createBarrageModColumnMetadata(metadata, myModRowOffset)); + } + + BarrageUpdateMetadata.startModColumnNodesVector(metadata, modOffsets.size()); + modOffsets.forEachDescending(offset -> { + metadata.addOffset(offset); + return true; + }); + final int nodesOffset = metadata.endVector(); + + BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); + BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); + BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); + BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); + BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); + BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); + BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); + BarrageUpdateMetadata.addRemovedRows(metadata, rowsRemovedOffset); + BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); + BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); + BarrageUpdateMetadata.addModColumnNodes(metadata, nodesOffset); + BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, reverseViewport); + metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); + + final FlatBufferBuilder header = new FlatBufferBuilder(); + final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); + BarrageMessageWrapper.startBarrageMessageWrapper(header); + BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); + BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); + BarrageMessageWrapper.addMsgPayload(header, payloadOffset); + header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); + + return header.dataBuffer().slice(); + } } /** @@ -412,7 +487,7 @@ public SnapshotView getSnapshotView(final BarrageSnapshotOptions options, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet snapshotColumns) { - return new SnapshotView(this, options, viewport, reverseViewport, keyspaceViewport, snapshotColumns); + return new SnapshotView(options, viewport, reverseViewport, keyspaceViewport, snapshotColumns); } /** @@ -426,8 +501,7 @@ public SnapshotView getSnapshotView(BarrageSnapshotOptions options) { return getSnapshotView(options, null, false, null, null); } - public static final class SnapshotView implements View { - private final BarrageStreamGeneratorImpl generator; + public final class SnapshotView implements View { private final BarrageSnapshotOptions options; private final RowSet viewport; private final boolean reverseViewport; @@ -436,13 +510,11 @@ public static final class SnapshotView implements View { private final RowSet addRowKeys; private final RowSet addRowOffsets; - public SnapshotView(final BarrageStreamGeneratorImpl generator, - final BarrageSnapshotOptions options, + public SnapshotView(final BarrageSnapshotOptions options, @Nullable final RowSet viewport, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @Nullable final BitSet subscribedColumns) { - this.generator = generator; this.options = options; this.viewport = viewport; this.reverseViewport = reverseViewport; @@ -451,10 +523,10 @@ public SnapshotView(final BarrageStreamGeneratorImpl generator, // precompute add row offsets if (keyspaceViewport != null) { - addRowKeys = keyspaceViewport.intersect(generator.rowsIncluded.original); - addRowOffsets = generator.rowsIncluded.original.invert(addRowKeys); + addRowKeys = keyspaceViewport.intersect(rowsIncluded.original); + addRowOffsets = rowsIncluded.original.invert(addRowKeys); } else { - addRowKeys = generator.rowsAdded.original.copy(); + addRowKeys = rowsAdded.original.copy(); addRowOffsets = RowSetFactory.flat(addRowKeys.size()); } @@ -464,7 +536,7 @@ public SnapshotView(final BarrageStreamGeneratorImpl generator, @Override public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); - ByteBuffer metadata = generator.getSnapshotMetadata(this); + ByteBuffer metadata = getSnapshotMetadata(); MutableLong bytesWritten = new MutableLong(0L); // batch size is maximum, will write fewer rows when needed @@ -472,16 +544,16 @@ public void forEachStream(Consumer visitor) throws IOException { final MutableInt actualBatchSize = new MutableInt(); if (numAddRows == 0) { // we still need to send a message containing metadata when there are no rows - visitor.accept(generator.getInputStream( - this, 0, 0, actualBatchSize, metadata, generator::appendAddColumns)); + visitor.accept(getInputStream(this, 0, 0, actualBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns)); } else { // send the add batches - generator.processBatches(visitor, this, numAddRows, maxBatchSize, metadata, generator::appendAddColumns, - bytesWritten); + processBatches(visitor, this, numAddRows, maxBatchSize, metadata, + BarrageStreamGeneratorImpl.this::appendAddColumns, bytesWritten); } addRowOffsets.close(); addRowKeys.close(); - generator.writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); + writeConsumer.onWrite(bytesWritten.get(), System.nanoTime() - startTm); } private int batchSize() { @@ -516,6 +588,58 @@ public RowSet addRowOffsets() { public RowSet modRowOffsets(int col) { throw new UnsupportedOperationException("asked for mod row on SnapshotView"); } + + private ByteBuffer getSnapshotMetadata() throws IOException { + final FlatBufferBuilder metadata = new FlatBufferBuilder(); + + int effectiveViewportOffset = 0; + if (isViewport()) { + try (final RowSetGenerator viewportGen = new RowSetGenerator(viewport)) { + effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); + } + } + + int effectiveColumnSetOffset = 0; + if (subscribedColumns != null) { + effectiveColumnSetOffset = new BitSetGenerator(subscribedColumns).addToFlatBuffer(metadata); + } + + final int rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); + + // no shifts in a snapshot, but need to provide a valid structure + final int shiftDataOffset = shifted.addToFlatBuffer(metadata); + + // Added Chunk Data: + int addedRowsIncludedOffset = 0; + // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same + if (isSnapshot || !addRowKeys.equals(rowsAdded.original)) { + addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(addRowKeys, metadata); + } + + BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); + BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); + BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); + BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); + BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); + BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); + BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); + BarrageUpdateMetadata.addRemovedRows(metadata, 0); + BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); + BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); + BarrageUpdateMetadata.addModColumnNodes(metadata, 0); + BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, reverseViewport); + metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); + + final FlatBufferBuilder header = new FlatBufferBuilder(); + final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); + BarrageMessageWrapper.startBarrageMessageWrapper(header); + BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); + BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); + BarrageMessageWrapper.addMsgPayload(header, payloadOffset); + header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); + + return header.dataBuffer().slice(); + } } public static final class SchemaView implements View { @@ -981,136 +1105,6 @@ private int appendModColumns(final View view, final long startRange, final int t return Math.toIntExact(numRows); } - private ByteBuffer getSubscriptionMetadata(final SubView view) throws IOException { - final FlatBufferBuilder metadata = new FlatBufferBuilder(); - - int effectiveViewportOffset = 0; - if (isSnapshot && view.isViewport()) { - try (final RowSetGenerator viewportGen = new RowSetGenerator(view.viewport)) { - effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); - } - } - - int effectiveColumnSetOffset = 0; - if (isSnapshot && view.subscribedColumns != null) { - effectiveColumnSetOffset = new BitSetGenerator(view.subscribedColumns).addToFlatBuffer(metadata); - } - - final int rowsAddedOffset; - if (isSnapshot && !view.isInitialSnapshot) { - // client's don't need/want to receive the full RowSet on every snapshot - rowsAddedOffset = EmptyRowSetGenerator.INSTANCE.addToFlatBuffer(metadata); - } else { - rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); - } - - final int rowsRemovedOffset = rowsRemoved.addToFlatBuffer(metadata); - final int shiftDataOffset = shifted.addToFlatBuffer(metadata); - - // Added Chunk Data: - int addedRowsIncludedOffset = 0; - - // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same - if (isSnapshot || !view.addRowKeys.equals(rowsAdded.original)) { - addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(view.addRowKeys, metadata); - } - - // now add mod-column streams, and write the mod column indexes - TIntArrayList modOffsets = new TIntArrayList(modColumnData.length); - for (final ModColumnData mcd : modColumnData) { - final int myModRowOffset; - if (view.keyspaceViewport != null) { - myModRowOffset = mcd.rowsModified.addToFlatBuffer(view.keyspaceViewport, metadata); - } else { - myModRowOffset = mcd.rowsModified.addToFlatBuffer(metadata); - } - modOffsets.add(BarrageModColumnMetadata.createBarrageModColumnMetadata(metadata, myModRowOffset)); - } - - BarrageUpdateMetadata.startModColumnNodesVector(metadata, modOffsets.size()); - modOffsets.forEachDescending(offset -> { - metadata.addOffset(offset); - return true; - }); - final int nodesOffset = metadata.endVector(); - - BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); - BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); - BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); - BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); - BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); - BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); - BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); - BarrageUpdateMetadata.addRemovedRows(metadata, rowsRemovedOffset); - BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); - BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); - BarrageUpdateMetadata.addModColumnNodes(metadata, nodesOffset); - BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, view.reverseViewport); - metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); - - final FlatBufferBuilder header = new FlatBufferBuilder(); - final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); - BarrageMessageWrapper.startBarrageMessageWrapper(header); - BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); - BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); - BarrageMessageWrapper.addMsgPayload(header, payloadOffset); - header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); - - return header.dataBuffer().slice(); - } - - private ByteBuffer getSnapshotMetadata(final SnapshotView view) throws IOException { - final FlatBufferBuilder metadata = new FlatBufferBuilder(); - - int effectiveViewportOffset = 0; - if (view.isViewport()) { - try (final RowSetGenerator viewportGen = new RowSetGenerator(view.viewport)) { - effectiveViewportOffset = viewportGen.addToFlatBuffer(metadata); - } - } - - int effectiveColumnSetOffset = 0; - if (view.subscribedColumns != null) { - effectiveColumnSetOffset = new BitSetGenerator(view.subscribedColumns).addToFlatBuffer(metadata); - } - - final int rowsAddedOffset = rowsAdded.addToFlatBuffer(metadata); - - // no shifts in a snapshot, but need to provide a valid structure - final int shiftDataOffset = shifted.addToFlatBuffer(metadata); - - // Added Chunk Data: - int addedRowsIncludedOffset = 0; - // don't send `rowsIncluded` when identical to `rowsAdded`, client will infer they are the same - if (isSnapshot || !view.addRowKeys.equals(rowsAdded.original)) { - addedRowsIncludedOffset = rowsIncluded.addToFlatBuffer(view.addRowKeys, metadata); - } - - BarrageUpdateMetadata.startBarrageUpdateMetadata(metadata); - BarrageUpdateMetadata.addIsSnapshot(metadata, isSnapshot); - BarrageUpdateMetadata.addFirstSeq(metadata, firstSeq); - BarrageUpdateMetadata.addLastSeq(metadata, lastSeq); - BarrageUpdateMetadata.addEffectiveViewport(metadata, effectiveViewportOffset); - BarrageUpdateMetadata.addEffectiveColumnSet(metadata, effectiveColumnSetOffset); - BarrageUpdateMetadata.addAddedRows(metadata, rowsAddedOffset); - BarrageUpdateMetadata.addRemovedRows(metadata, 0); - BarrageUpdateMetadata.addShiftData(metadata, shiftDataOffset); - BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset); - BarrageUpdateMetadata.addModColumnNodes(metadata, 0); - BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, view.reverseViewport); - metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata)); - - final FlatBufferBuilder header = new FlatBufferBuilder(); - final int payloadOffset = BarrageMessageWrapper.createMsgPayloadVector(header, metadata.dataBuffer()); - BarrageMessageWrapper.startBarrageMessageWrapper(header); - BarrageMessageWrapper.addMagic(header, BarrageUtil.FLATBUFFER_MAGIC); - BarrageMessageWrapper.addMsgType(header, BarrageMessageType.BarrageUpdateMetadata); - BarrageMessageWrapper.addMsgPayload(header, payloadOffset); - header.finish(BarrageMessageWrapper.endBarrageMessageWrapper(header)); - - return header.dataBuffer().slice(); - } - public static abstract class ByteArrayGenerator { protected int len; protected byte[] raw; From d4b7d568e83ec13a1f3afba390b6ca7dcdd35c10 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 30 May 2024 16:22:10 -0500 Subject: [PATCH 04/13] Rename mod col generator, let it close its owned resources --- .../barrage/BarrageStreamGeneratorImpl.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 8c405696da6..5fd243c1a31 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -129,14 +129,20 @@ protected void writeHeader( } } - public static class ModColumnData { + public static class ModColumnGenerator implements SafeCloseable { private final RowSetGenerator rowsModified; private final ChunkListInputStreamGenerator data; - ModColumnData(final BarrageMessage.ModColumnData col) throws IOException { + ModColumnGenerator(final BarrageMessage.ModColumnData col) throws IOException { rowsModified = new RowSetGenerator(col.rowsModified); data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType); } + + @Override + public void close() { + rowsModified.close(); + data.close(); + } } private final BarrageMessage message; @@ -153,7 +159,7 @@ public static class ModColumnData { private final RowSetShiftDataGenerator shifted; private final ChunkListInputStreamGenerator[] addColumnData; - private final ModColumnData[] modColumnData; + private final ModColumnGenerator[] modColumnData; /** * Create a barrage stream generator that can slice and dice the barrage message for delivery to clients. @@ -182,9 +188,9 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message, columnData.data, columnData.chunkType); } - modColumnData = new ModColumnData[message.modColumnData.length]; + modColumnData = new ModColumnGenerator[message.modColumnData.length]; for (int i = 0; i < modColumnData.length; ++i) { - modColumnData[i] = new ModColumnData(message.modColumnData[i]); + modColumnData[i] = new ModColumnGenerator(message.modColumnData[i]); } } catch (final IOException e) { throw new UncheckedDeephavenException("unexpected IOException while creating barrage message stream", e); @@ -207,15 +213,10 @@ public void close() { rowsRemoved.close(); if (addColumnData != null) { - for (final ChunkListInputStreamGenerator in : addColumnData) { - in.close(); - } + SafeCloseable.closeAll(addColumnData); } if (modColumnData != null) { - for (final ModColumnData mcd : modColumnData) { - mcd.rowsModified.close(); - mcd.data.close(); - } + SafeCloseable.closeAll(modColumnData); } } @@ -288,7 +289,7 @@ public SubView(final BarrageSubscriptionOptions options, // precompute the modified column indexes, and calculate total rows needed long numModRows = 0; for (int ii = 0; ii < modColumnData.length; ++ii) { - final ModColumnData mcd = modColumnData[ii]; + final ModColumnGenerator mcd = modColumnData[ii]; if (keyspaceViewport != null) { try (WritableRowSet intersect = keyspaceViewport.intersect(mcd.rowsModified.original)) { @@ -428,7 +429,7 @@ private ByteBuffer getSubscriptionMetadata() throws IOException { // now add mod-column streams, and write the mod column indexes TIntArrayList modOffsets = new TIntArrayList(modColumnData.length); - for (final ModColumnData mcd : modColumnData) { + for (final ModColumnGenerator mcd : modColumnData) { final int myModRowOffset; if (keyspaceViewport != null) { myModRowOffset = mcd.rowsModified.addToFlatBuffer(keyspaceViewport, metadata); @@ -1008,7 +1009,7 @@ private int appendModColumns(final View view, final long startRange, final int t // adjust the batch size if we would cross a chunk boundary for (int ii = 0; ii < modColumnData.length; ++ii) { - final ModColumnData mcd = modColumnData[ii]; + final ModColumnGenerator mcd = modColumnData[ii]; final List generators = mcd.data.generators(); if (generators.isEmpty()) { continue; @@ -1029,7 +1030,7 @@ private int appendModColumns(final View view, final long startRange, final int t // now add mod-column streams, and write the mod column indexes long numRows = 0; for (int ii = 0; ii < modColumnData.length; ++ii) { - final ModColumnData mcd = modColumnData[ii]; + final ModColumnGenerator mcd = modColumnData[ii]; final ChunkInputStreamGenerator generator = mcd.data.generators().isEmpty() ? null : mcd.data.generators().get(columnChunkIdx[ii]); From 9e265d7e518bddd48e4c9cd790502a00131c1335 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 30 May 2024 16:24:49 -0500 Subject: [PATCH 05/13] Use FlatBufferBuilder's own createByteVector --- .../barrage/BarrageStreamGeneratorImpl.java | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 5fd243c1a31..ebc617d28b3 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -824,21 +824,6 @@ protected void writeHeader( cos.flush(); } - private static int createByteVector(final FlatBufferBuilder builder, final byte[] data, final int offset, - final int length) { - builder.startVector(1, length, 1); - - if (length > 0) { - builder.prep(1, length - 1); - - for (int i = length - 1; i >= 0; --i) { - builder.putByte(data[offset + i]); - } - } - - return builder.endVector(); - } - private void processBatches(Consumer visitor, final View view, final long numRows, final int maxBatchSize, ByteBuffer metadata, final ColumnVisitor columnVisitor, final MutableLong bytesWritten) throws IOException { @@ -1111,7 +1096,7 @@ public static abstract class ByteArrayGenerator { protected byte[] raw; protected int addToFlatBuffer(final FlatBufferBuilder builder) { - return createByteVector(builder, raw, 0, len); + return builder.createByteVector(raw, 0, len); } } @@ -1163,7 +1148,7 @@ protected int addToFlatBuffer(final RowSet viewport, final FlatBufferBuilder bui nlen = baos.size(); } - return createByteVector(builder, nraw, 0, nlen); + return builder.createByteVector(nraw, 0, nlen); } } @@ -1185,7 +1170,7 @@ public int addToFlatBuffer(final BitSet mine, final FlatBufferBuilder builder) t final byte[] nraw = mine.toByteArray(); final int nBits = mine.previousSetBit(Integer.MAX_VALUE - 1) + 1; final int nlen = (int) ((long) nBits + 7) / 8; - return createByteVector(builder, nraw, 0, nlen); + return builder.createByteVector(nraw, 0, nlen); } } From 1f17556a147c655f13d8c81a0904f28fa649c17b Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 31 May 2024 12:42:33 -0500 Subject: [PATCH 06/13] Draw clearer lines with BSG contract, avoid referencing Impl --- .../barrage/BarrageStreamGenerator.java | 15 ++-- .../barrage/BarrageStreamGeneratorImpl.java | 87 +++++-------------- .../extensions/barrage/util/BarrageUtil.java | 15 ++-- .../barrage/util/TableToArrowConverter.java | 5 +- .../server/arrow/ArrowFlightUtil.java | 19 ++-- .../deephaven/server/arrow/ArrowModule.java | 10 ++- .../server/arrow/FlightServiceGrpcImpl.java | 5 +- .../barrage/BarrageMessageProducer.java | 52 ++++++----- .../HierarchicalTableViewSubscription.java | 17 ++-- .../server/barrage/BarrageBlinkTableTest.java | 7 +- .../barrage/BarrageMessageRoundTripTest.java | 17 ++-- 11 files changed, 102 insertions(+), 147 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java index 08f91e3665d..730f8700781 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java @@ -10,25 +10,30 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.io.IOException; +import java.io.InputStream; import java.util.BitSet; +import java.util.function.Consumer; import java.util.function.ToIntFunction; /** * A StreamGenerator takes a BarrageMessage and re-uses portions of the serialized payload across different subscribers * that may subscribe to different viewports and columns. - * - * @param The sub-view type that the listener expects to receive. */ -public interface BarrageStreamGenerator extends SafeCloseable { +public interface BarrageStreamGenerator extends SafeCloseable { - interface Factory { + interface MessageView { + void forEachStream(Consumer visitor) throws IOException; + } + + interface Factory { /** * Create a StreamGenerator that now owns the BarrageMessage. * * @param message the message that contains the update that we would like to propagate * @param metricsConsumer a method that can be used to record write metrics */ - BarrageStreamGenerator newGenerator( + BarrageStreamGenerator newGenerator( BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer); /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index ebc617d28b3..34162391125 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -57,8 +57,7 @@ import static io.deephaven.extensions.barrage.chunk.BaseChunkInputStreamGenerator.PADDING_BUFFER; import static io.deephaven.proto.flight.util.MessageHelper.toIpcBytes; -public class BarrageStreamGeneratorImpl implements - BarrageStreamGenerator { +public class BarrageStreamGeneratorImpl implements BarrageStreamGenerator { private static final Logger log = LoggerFactory.getLogger(BarrageStreamGeneratorImpl.class); // NB: This should likely be something smaller, such as 1<<16, but since the js api is not yet able @@ -75,37 +74,30 @@ public class BarrageStreamGeneratorImpl implements .getIntegerForClassWithDefault(BarrageStreamGeneratorImpl.class, "maxOutboundMessageSize", 100 * 1024 * 1024); - public interface View { - void forEachStream(Consumer visitor) throws IOException; - + public interface RecordBatchMessageView extends MessageView { boolean isViewport(); StreamReaderOptions options(); - int clientMaxMessageSize(); - RowSet addRowOffsets(); RowSet modRowOffsets(int col); } - public static class Factory - implements BarrageStreamGenerator.Factory { - public Factory() {} - + public static class Factory implements BarrageStreamGenerator.Factory { @Override - public BarrageStreamGenerator newGenerator( + public BarrageStreamGenerator newGenerator( final BarrageMessage message, final BarragePerformanceLog.WriteMetricsConsumer metricsConsumer) { return new BarrageStreamGeneratorImpl(message, metricsConsumer); } @Override - public View getSchemaView(@NotNull final ToIntFunction schemaPayloadWriter) { + public MessageView getSchemaView(@NotNull final ToIntFunction schemaPayloadWriter) { final FlatBufferBuilder builder = new FlatBufferBuilder(); final int schemaOffset = schemaPayloadWriter.applyAsInt(builder); builder.finish(MessageHelper.wrapInMessage(builder, schemaOffset, org.apache.arrow.flatbuf.MessageHeader.Schema)); - return new SchemaView(builder.dataBuffer()); + return new SchemaMessageView(builder.dataBuffer()); } } @@ -114,7 +106,7 @@ public View getSchemaView(@NotNull final ToIntFunction schema */ public static class ArrowFactory extends Factory { @Override - public BarrageStreamGenerator newGenerator( + public BarrageStreamGenerator newGenerator( BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer) { return new BarrageStreamGeneratorImpl(message, metricsConsumer) { @Override @@ -232,7 +224,7 @@ public void close() { * @return a MessageView filtered by the subscription properties that can be sent to that subscriber */ @Override - public SubView getSubView(final BarrageSubscriptionOptions options, + public MessageView getSubView(final BarrageSubscriptionOptions options, final boolean isInitialSnapshot, @Nullable final RowSet viewport, final boolean reverseViewport, @@ -250,11 +242,11 @@ public SubView getSubView(final BarrageSubscriptionOptions options, * @return a MessageView filtered by the subscription properties that can be sent to that subscriber */ @Override - public SubView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot) { + public MessageView getSubView(BarrageSubscriptionOptions options, boolean isInitialSnapshot) { return getSubView(options, isInitialSnapshot, null, false, null, null); } - public final class SubView implements View { + private final class SubView implements RecordBatchMessageView { private final BarrageSubscriptionOptions options; private final boolean isInitialSnapshot; private final RowSet viewport; @@ -365,11 +357,6 @@ private int batchSize() { return batchSize; } - @Override - public int clientMaxMessageSize() { - return options.maxMessageSize(); - } - @Override public boolean isViewport() { return viewport != null; @@ -483,7 +470,7 @@ private ByteBuffer getSubscriptionMetadata() throws IOException { * @return a MessageView filtered by the snapshot properties that can be sent to that subscriber */ @Override - public SnapshotView getSnapshotView(final BarrageSnapshotOptions options, + public MessageView getSnapshotView(final BarrageSnapshotOptions options, @Nullable final RowSet viewport, final boolean reverseViewport, @Nullable final RowSet keyspaceViewport, @@ -498,11 +485,11 @@ public SnapshotView getSnapshotView(final BarrageSnapshotOptions options, * @return a MessageView filtered by the snapshot properties that can be sent to that subscriber */ @Override - public SnapshotView getSnapshotView(BarrageSnapshotOptions options) { + public MessageView getSnapshotView(BarrageSnapshotOptions options) { return getSnapshotView(options, null, false, null, null); } - public final class SnapshotView implements View { + private final class SnapshotView implements RecordBatchMessageView { private final BarrageSnapshotOptions options; private final RowSet viewport; private final boolean reverseViewport; @@ -565,11 +552,6 @@ private int batchSize() { return batchSize; } - @Override - public int clientMaxMessageSize() { - return options.maxMessageSize(); - } - @Override public boolean isViewport() { return viewport != null; @@ -643,10 +625,10 @@ private ByteBuffer getSnapshotMetadata() throws IOException { } } - public static final class SchemaView implements View { + private static final class SchemaMessageView implements MessageView { private final byte[] msgBytes; - public SchemaView(final ByteBuffer buffer) { + public SchemaMessageView(final ByteBuffer buffer) { this.msgBytes = Flight.FlightData.newBuilder() .setDataHeader(ByteStringAccess.wrap(buffer)) .build() @@ -657,36 +639,11 @@ public SchemaView(final ByteBuffer buffer) { public void forEachStream(Consumer visitor) { visitor.accept(new DrainableByteArrayInputStream(msgBytes, 0, msgBytes.length)); } - - @Override - public boolean isViewport() { - return false; - } - - @Override - public StreamReaderOptions options() { - return null; - } - - @Override - public int clientMaxMessageSize() { - return 0; - } - - @Override - public RowSet addRowOffsets() { - return null; - } - - @Override - public RowSet modRowOffsets(int col) { - return null; - } } @FunctionalInterface private interface ColumnVisitor { - int visit(final View view, final long startRange, final int targetBatchSize, + int visit(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException; @@ -704,7 +661,7 @@ int visit(final View view, final long startRange, final int targetBatchSize, * @param columnVisitor the helper method responsible for appending the payload columns to the RecordBatch * @return an InputStream ready to be drained by GRPC */ - private InputStream getInputStream(final View view, final long offset, final int targetBatchSize, + private InputStream getInputStream(final RecordBatchMessageView view, final long offset, final int targetBatchSize, final MutableInt actualBatchSize, final ByteBuffer metadata, final ColumnVisitor columnVisitor) throws IOException { final ArrayDeque streams = new ArrayDeque<>(); @@ -824,7 +781,7 @@ protected void writeHeader( cos.flush(); } - private void processBatches(Consumer visitor, final View view, + private void processBatches(Consumer visitor, final RecordBatchMessageView view, final long numRows, final int maxBatchSize, ByteBuffer metadata, final ColumnVisitor columnVisitor, final MutableLong bytesWritten) throws IOException { long offset = 0; @@ -833,8 +790,8 @@ private void processBatches(Consumer visitor, final View view, int batchSize = Math.min(DEFAULT_INITIAL_BATCH_SIZE, maxBatchSize); // allow the client to override the default message size - final int maxMessageSize = - view.clientMaxMessageSize() > 0 ? view.clientMaxMessageSize() : DEFAULT_MESSAGE_SIZE_LIMIT; + int clientMaxMessageSize = view.options().maxMessageSize(); + final int maxMessageSize = clientMaxMessageSize > 0 ? clientMaxMessageSize : DEFAULT_MESSAGE_SIZE_LIMIT; // TODO (deephaven-core#188): remove this when JS API can accept multiple batches boolean sendAllowed = numRows <= batchSize; @@ -917,7 +874,7 @@ private static int findGeneratorForOffset(final List return low; } - private int appendAddColumns(final View view, final long startRange, final int targetBatchSize, + private int appendAddColumns(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { if (addColumnData.length == 0) { @@ -983,7 +940,7 @@ private int appendAddColumns(final View view, final long startRange, final int t } } - private int appendModColumns(final View view, final long startRange, final int targetBatchSize, + private int appendModColumns(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index 2bb0709898a..8175b32bcbb 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -28,7 +28,6 @@ import io.deephaven.extensions.barrage.BarragePerformanceLog; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; import io.deephaven.extensions.barrage.BarrageStreamGenerator; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.extensions.barrage.chunk.vector.VectorExpansionKernel; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; @@ -702,13 +701,13 @@ private static Field arrowFieldForVectorType( } public static void createAndSendStaticSnapshot( - BarrageStreamGenerator.Factory streamGeneratorFactory, + BarrageStreamGenerator.Factory streamGeneratorFactory, BaseTable table, BitSet columns, RowSet viewport, boolean reverseViewport, BarrageSnapshotOptions snapshotRequestOptions, - StreamObserver listener, + StreamObserver listener, BarragePerformanceLog.SnapshotMetricsHelper metrics) { // start with small value and grow long snapshotTargetCellCount = MIN_SNAPSHOT_CELL_COUNT; @@ -755,8 +754,7 @@ public static void createAndSendStaticSnapshot( // send out the data. Note that although a `BarrageUpdateMetaData` object will // be provided with each unique snapshot, vanilla Flight clients will ignore // these and see only an incoming stream of batches - try (final BarrageStreamGenerator bsg = - streamGeneratorFactory.newGenerator(msg, metrics)) { + try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics)) { if (rsIt.hasMore()) { listener.onNext(bsg.getSnapshotView(snapshotRequestOptions, snapshotViewport, false, @@ -797,11 +795,11 @@ public static void createAndSendStaticSnapshot( } public static void createAndSendSnapshot( - BarrageStreamGenerator.Factory streamGeneratorFactory, + BarrageStreamGenerator.Factory streamGeneratorFactory, BaseTable table, BitSet columns, RowSet viewport, boolean reverseViewport, BarrageSnapshotOptions snapshotRequestOptions, - StreamObserver listener, + StreamObserver listener, BarragePerformanceLog.SnapshotMetricsHelper metrics) { // if the table is static and a full snapshot is requested, we can make and send multiple @@ -828,8 +826,7 @@ public static void createAndSendSnapshot( msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data // translate the viewport to keyspace and make the call - try (final BarrageStreamGenerator bsg = - streamGeneratorFactory.newGenerator(msg, metrics); + try (final BarrageStreamGenerator bsg = streamGeneratorFactory.newGenerator(msg, metrics); final RowSet keySpaceViewport = viewport != null ? msg.rowsAdded.subSetForPositions(viewport, reverseViewport) : null) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java index c3ccb4df4c3..88cb365980c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/TableToArrowConverter.java @@ -5,6 +5,7 @@ import io.deephaven.engine.table.impl.BaseTable; import io.deephaven.extensions.barrage.BarragePerformanceLog; +import io.deephaven.extensions.barrage.BarrageStreamGenerator; import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.grpc.Drainable; import io.grpc.stub.StreamObserver; @@ -58,11 +59,11 @@ public byte[] next() { return listener.batchMessages.pop(); } - private static class ArrowBuilderObserver implements StreamObserver { + private static class ArrowBuilderObserver implements StreamObserver { final Deque batchMessages = new ArrayDeque<>(); @Override - public void onNext(final BarrageStreamGeneratorImpl.View messageView) { + public void onNext(final BarrageStreamGenerator.MessageView messageView) { try { messageView.forEachStream(inputStream -> { try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos = diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 27466f2729f..12162bceda9 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -38,7 +38,6 @@ import io.deephaven.proto.util.Exceptions; import io.deephaven.proto.util.ExportTicketHelper; import io.deephaven.server.barrage.BarrageMessageProducer; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.server.hierarchicaltable.HierarchicalTableView; import io.deephaven.server.hierarchicaltable.HierarchicalTableViewSubscription; import io.deephaven.server.session.SessionService; @@ -67,7 +66,7 @@ public class ArrowFlightUtil { Configuration.getInstance().getIntegerWithDefault("barrage.minUpdateInterval", 1000); public static void DoGetCustom( - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final SessionState session, final TicketRouter ticketRouter, final Flight.Ticket request, @@ -105,7 +104,7 @@ public static void DoGetCustom( metrics.tableKey = BarragePerformanceLog.getKeyFor(table); // create an adapter for the response observer - final StreamObserver listener = + final StreamObserver listener = ArrowModule.provideListenerAdapter().adapt(observer); // push the schema to the listener @@ -327,15 +326,15 @@ public interface Factory { private final String myPrefix; private final SessionState session; - private final StreamObserver listener; + private final StreamObserver listener; private boolean isClosed = false; private boolean isFirstMsg = true; private final TicketRouter ticketRouter; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; - private final BarrageMessageProducer.Operation.Factory bmpOperationFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageMessageProducer.Operation.Factory bmpOperationFactory; private final HierarchicalTableViewSubscription.Factory htvsFactory; private final BarrageMessageProducer.Adapter subscriptionOptAdapter; private final BarrageMessageProducer.Adapter snapshotOptAdapter; @@ -353,10 +352,10 @@ interface Handler extends Closeable { @AssistedInject public DoExchangeMarshaller( final TicketRouter ticketRouter, - final BarrageStreamGenerator.Factory streamGeneratorFactory, - final BarrageMessageProducer.Operation.Factory bmpOperationFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageMessageProducer.Operation.Factory bmpOperationFactory, final HierarchicalTableViewSubscription.Factory htvsFactory, - final BarrageMessageProducer.Adapter, StreamObserver> listenerAdapter, + final BarrageMessageProducer.Adapter, StreamObserver> listenerAdapter, final BarrageMessageProducer.Adapter subscriptionOptAdapter, final BarrageMessageProducer.Adapter snapshotOptAdapter, final SessionService.ErrorTransformer errorTransformer, @@ -612,7 +611,7 @@ public void close() { private class SubscriptionRequestHandler implements Handler { - private BarrageMessageProducer bmp; + private BarrageMessageProducer bmp; private HierarchicalTableViewSubscription htvs; private Queue preExportSubscriptions; diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java b/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java index 727bcf51368..5bbd59dfd67 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java @@ -32,17 +32,19 @@ public abstract class ArrowModule { @IntoSet abstract BindableService bindBrowserFlightServiceBinding(BrowserFlightServiceGrpcBinding service); - @Provides + @Binds @Singleton - static BarrageStreamGenerator.Factory bindStreamGenerator() { + static BarrageStreamGenerator.Factory bindStreamGenerator() { return new BarrageStreamGeneratorImpl.Factory(); } + + // TODO before commit, try getting rid of this @Provides - static BarrageMessageProducer.Adapter, StreamObserver> provideListenerAdapter() { + static BarrageMessageProducer.Adapter, StreamObserver> provideListenerAdapter() { return delegate -> new StreamObserver<>() { @Override - public void onNext(final BarrageStreamGeneratorImpl.View view) { + public void onNext(final BarrageStreamGenerator.MessageView view) { try { synchronized (delegate) { view.forEachStream(delegate::onNext); diff --git a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java index ca2df8a7827..f290dc75860 100644 --- a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java @@ -19,7 +19,6 @@ import io.deephaven.io.logger.Logger; import io.deephaven.proto.backplane.grpc.ExportNotification; import io.deephaven.proto.backplane.grpc.WrappedAuthenticationRequest; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.proto.util.Exceptions; import io.deephaven.server.session.SessionService; import io.deephaven.server.session.SessionState; @@ -45,7 +44,7 @@ public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBa private static final Logger log = LoggerFactory.getLogger(FlightServiceGrpcImpl.class); private final ScheduledExecutorService executorService; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final SessionService sessionService; private final SessionService.ErrorTransformer errorTransformer; private final TicketRouter ticketRouter; @@ -56,7 +55,7 @@ public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBa @Inject public FlightServiceGrpcImpl( @Nullable final ScheduledExecutorService executorService, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final SessionService sessionService, final SessionService.ErrorTransformer errorTransformer, final TicketRouter ticketRouter, diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index d62d56d0445..518ed1ab699 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -81,10 +81,8 @@ * inside the same JVM. *

* The client-side counterpart of this is the {@link StreamReader}. - * - * @param The sub-view type that the listener expects to receive. */ -public class BarrageMessageProducer extends LivenessArtifact +public class BarrageMessageProducer extends LivenessArtifact implements DynamicNode, NotificationStepReceiver { private static final int DELTA_CHUNK_SIZE = Configuration.getInstance().getIntegerForClassWithDefault( BarrageMessageProducer.class, "deltaChunkSize", ChunkPoolConstants.LARGEST_POOLED_CHUNK_CAPACITY); @@ -108,17 +106,17 @@ public interface Adapter { V adapt(T t); } - public static class Operation - implements QueryTable.MemoizableOperation> { + public static class Operation + implements QueryTable.MemoizableOperation { @AssistedFactory - public interface Factory { - Operation create(BaseTable parent, long updateIntervalMs); + public interface Factory { + Operation create(BaseTable parent, long updateIntervalMs); } private final Scheduler scheduler; private final SessionService.ErrorTransformer errorTransformer; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final BaseTable parent; private final long updateIntervalMs; private final Runnable onGetSnapshot; @@ -127,7 +125,7 @@ public interface Factory { public Operation( final Scheduler scheduler, final SessionService.ErrorTransformer errorTransformer, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, @Assisted final BaseTable parent, @Assisted final long updateIntervalMs) { this(scheduler, errorTransformer, streamGeneratorFactory, parent, updateIntervalMs, null); @@ -137,7 +135,7 @@ public Operation( public Operation( final Scheduler scheduler, final SessionService.ErrorTransformer errorTransformer, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final BaseTable parent, final long updateIntervalMs, @Nullable final Runnable onGetSnapshot) { @@ -165,10 +163,10 @@ public MemoizedOperationKey getMemoizedOperationKey() { } @Override - public Result> initialize(final boolean usePrev, + public Result initialize(final boolean usePrev, final long beforeClock) { - final BarrageMessageProducer result = new BarrageMessageProducer( - scheduler, errorTransformer, streamGeneratorFactory, parent, updateIntervalMs, onGetSnapshot); + final BarrageMessageProducer result = new BarrageMessageProducer(scheduler, errorTransformer, + streamGeneratorFactory, parent, updateIntervalMs, onGetSnapshot); return new Result<>(result, result.constructListener()); } } @@ -199,7 +197,7 @@ public int hashCode() { private final String logPrefix; private final Scheduler scheduler; private final SessionService.ErrorTransformer errorTransformer; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final BaseTable parent; private final long updateIntervalMs; @@ -308,7 +306,7 @@ public void close() { public BarrageMessageProducer( final Scheduler scheduler, final SessionService.ErrorTransformer errorTransformer, - final BarrageStreamGenerator.Factory streamGeneratorFactory, + final BarrageStreamGenerator.Factory streamGeneratorFactory, final BaseTable parent, final long updateIntervalMs, final Runnable onGetSnapshot) { @@ -415,7 +413,7 @@ public void setOnGetSnapshot(Runnable onGetSnapshot, boolean isPreSnap) { */ private class Subscription { final BarrageSubscriptionOptions options; - final StreamObserver listener; + final StreamObserver listener; final String logPrefix; RowSet viewport; // active viewport @@ -445,7 +443,7 @@ private class Subscription { WritableRowSet growingIncrementalViewport = null; // rows to be sent to the client from the current snapshot boolean isFirstSnapshot; // is this the first snapshot after a change to a subscriptions - private Subscription(final StreamObserver listener, + private Subscription(final StreamObserver listener, final BarrageSubscriptionOptions options, final BitSet subscribedColumns, @Nullable final RowSet initialViewport, @@ -473,7 +471,7 @@ public boolean isViewport() { * @param columnsToSubscribe The initial columns to subscribe to * @param initialViewport Initial viewport, to be owned by the subscription */ - public void addSubscription(final StreamObserver listener, + public void addSubscription(final StreamObserver listener, final BarrageSubscriptionOptions options, @Nullable final BitSet columnsToSubscribe, @Nullable final RowSet initialViewport, @@ -518,7 +516,7 @@ public void addSubscription(final StreamObserver listener, } } - private boolean findAndUpdateSubscription(final StreamObserver listener, + private boolean findAndUpdateSubscription(final StreamObserver listener, final Consumer updateSubscription) { final Function, Boolean> findAndUpdate = (List subscriptions) -> { for (final Subscription sub : subscriptions) { @@ -546,13 +544,14 @@ private boolean findAndUpdateSubscription(final StreamObserver list } } - public boolean updateSubscription(final StreamObserver listener, + public boolean updateSubscription(final StreamObserver listener, @Nullable final RowSet newViewport, @Nullable final BitSet columnsToSubscribe) { // assume forward viewport when not specified return updateSubscription(listener, newViewport, columnsToSubscribe, false); } - public boolean updateSubscription(final StreamObserver listener, @Nullable final RowSet newViewport, + public boolean updateSubscription(final StreamObserver listener, + @Nullable final RowSet newViewport, @Nullable final BitSet columnsToSubscribe, final boolean newReverseViewport) { return findAndUpdateSubscription(listener, sub -> { if (sub.pendingViewport != null) { @@ -582,7 +581,7 @@ public boolean updateSubscription(final StreamObserver listener, @N }); } - public void removeSubscription(final StreamObserver listener) { + public void removeSubscription(final StreamObserver listener) { findAndUpdateSubscription(listener, sub -> { sub.pendingDelete = true; if (log.isDebugEnabled()) { @@ -1457,7 +1456,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { } if (snapshot != null) { - try (final BarrageStreamGenerator snapshotGenerator = + try (final BarrageStreamGenerator snapshotGenerator = streamGeneratorFactory.newGenerator(snapshot, this::recordWriteMetrics)) { if (log.isDebugEnabled()) { log.debug().append(logPrefix).append("Sending snapshot to ").append(activeSubscriptions.size()) @@ -1515,7 +1514,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { private void propagateToSubscribers(final BarrageMessage message, final RowSet propRowSetForMessage) { // message is released via transfer to stream generator (as it must live until all view's are closed) - try (final BarrageStreamGenerator generator = streamGeneratorFactory.newGenerator( + try (final BarrageStreamGenerator generator = streamGeneratorFactory.newGenerator( message, this::recordWriteMetrics)) { for (final Subscription subscription : activeSubscriptions) { if (subscription.pendingInitialSnapshot || subscription.pendingDelete) { @@ -1567,9 +1566,8 @@ private void clearObjectDeltaColumns(@NotNull final BitSet objectColumnsToClear) } } - private void propagateSnapshotForSubscription( - final Subscription subscription, - final BarrageStreamGenerator snapshotGenerator) { + private void propagateSnapshotForSubscription(final Subscription subscription, + final BarrageStreamGenerator snapshotGenerator) { boolean needsSnapshot = subscription.pendingInitialSnapshot; // This is a little confusing, but by the time we propagate, the `snapshotViewport`/`snapshotColumns` objects diff --git a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java index 695c2b73ac5..4bf7f7e52b4 100644 --- a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java +++ b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java @@ -34,7 +34,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.BitSet; @@ -55,7 +54,7 @@ public class HierarchicalTableViewSubscription extends LivenessArtifact { public interface Factory { HierarchicalTableViewSubscription create( HierarchicalTableView view, - StreamObserver listener, + StreamObserver listener, BarrageSubscriptionOptions subscriptionOptions, long intervalMillis); } @@ -64,10 +63,10 @@ HierarchicalTableViewSubscription create( private final Scheduler scheduler; private final SessionService.ErrorTransformer errorTransformer; - private final BarrageStreamGenerator.Factory streamGeneratorFactory; + private final BarrageStreamGenerator.Factory streamGeneratorFactory; private final HierarchicalTableView view; - private final StreamObserver listener; + private final StreamObserver listener; private final BarrageSubscriptionOptions subscriptionOptions; private final long intervalDurationNanos; @@ -106,9 +105,9 @@ private enum State { public HierarchicalTableViewSubscription( @NotNull final Scheduler scheduler, @NotNull final SessionService.ErrorTransformer errorTransformer, - @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, + @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, @Assisted @NotNull final HierarchicalTableView view, - @Assisted @NotNull final StreamObserver listener, + @Assisted @NotNull final StreamObserver listener, @Assisted @NotNull final BarrageSubscriptionOptions subscriptionOptions, @Assisted final long intervalDurationMillis) { this.scheduler = scheduler; @@ -293,8 +292,8 @@ private void process() { } private static long buildAndSendSnapshot( - @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, - @NotNull final StreamObserver listener, + @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, + @NotNull final StreamObserver listener, @NotNull final BarrageSubscriptionOptions subscriptionOptions, @NotNull final HierarchicalTableView view, @NotNull final LongConsumer snapshotNanosConsumer, @@ -356,7 +355,7 @@ private static long buildAndSendSnapshot( barrageMessage.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // 5. Send the BarrageMessage - final BarrageStreamGenerator streamGenerator = + final BarrageStreamGenerator streamGenerator = streamGeneratorFactory.newGenerator(barrageMessage, writeMetricsConsumer); // Note that we're always specifying "isInitialSnapshot=true". This is to provoke the subscription view to // send the added rows on every snapshot, since (1) our added rows are flat, and thus cheap to send, and diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java index bce51e45681..dcb7445077e 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java @@ -28,7 +28,6 @@ import io.deephaven.engine.util.TableDiff; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.BarrageStreamGenerator; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.BarrageStreamReader; @@ -63,7 +62,7 @@ public class BarrageBlinkTableTest extends RefreshingTableTestCase { private QueryTable sourceTable; private TrackingWritableRowSet blinkRowSet; private QueryTable blinkTable; - private BarrageMessageProducer barrageMessageProducer; + private BarrageMessageProducer barrageMessageProducer; private TableUpdateValidator originalTUV; private FailureListener originalTUVListener; @@ -72,7 +71,7 @@ public class BarrageBlinkTableTest extends RefreshingTableTestCase { ArrowModule.class }) public interface TestComponent { - BarrageStreamGenerator.Factory getStreamGeneratorFactory(); + BarrageStreamGenerator.Factory getStreamGeneratorFactory(); @Component.Builder interface Builder { @@ -101,7 +100,7 @@ public void setUp() throws Exception { blinkTable.setRefreshing(true); blinkTable.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true); - barrageMessageProducer = blinkTable.getResult(new BarrageMessageProducer.Operation<>( + barrageMessageProducer = blinkTable.getResult(new BarrageMessageProducer.Operation( scheduler, new SessionService.ObfuscatingErrorTransformer(), daggerRoot.getStreamGeneratorFactory(), blinkTable, UPDATE_INTERVAL, () -> { })); diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java index 5f3351f8f04..314cd1db623 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java @@ -27,7 +27,6 @@ import io.deephaven.engine.util.TableDiff; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.BarrageStreamGenerator; -import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.BarrageProtoUtil; @@ -76,7 +75,7 @@ public class BarrageMessageRoundTripTest extends RefreshingTableTestCase { ArrowModule.class }) public interface TestComponent { - BarrageStreamGenerator.Factory getStreamGeneratorFactory(); + BarrageStreamGenerator.Factory getStreamGeneratorFactory(); @Component.Builder interface Builder { @@ -149,7 +148,7 @@ private class RemoteClient { private final BarrageTable barrageTable; @ReferentialIntegrity - private final BarrageMessageProducer barrageMessageProducer; + private final BarrageMessageProducer barrageMessageProducer; @ReferentialIntegrity private final TableUpdateValidator replicatedTUV; @@ -163,14 +162,14 @@ private class RemoteClient { // The replicated table's TableUpdateValidator will be confused if the table is a viewport. Instead we rely on // comparing the producer table to the consumer table to validate contents are correct. RemoteClient(final RowSet viewport, final BitSet subscribedColumns, - final BarrageMessageProducer barrageMessageProducer, + final BarrageMessageProducer barrageMessageProducer, final Table sourceTable, final String name) { // assume a forward viewport when not specified this(viewport, subscribedColumns, barrageMessageProducer, sourceTable, name, false, false); } RemoteClient(final RowSet viewport, final BitSet subscribedColumns, - final BarrageMessageProducer barrageMessageProducer, + final BarrageMessageProducer barrageMessageProducer, final Table sourceTable, final String name, final boolean reverseViewport, final boolean deferSubscription) { this.viewport = viewport; @@ -342,7 +341,7 @@ private class RemoteNugget implements EvalNuggetInterface { private final QueryTable originalTable; @ReferentialIntegrity - private final BarrageMessageProducer barrageMessageProducer; + private final BarrageMessageProducer barrageMessageProducer; @ReferentialIntegrity private final TableUpdateValidator originalTUV; @@ -354,7 +353,7 @@ private class RemoteNugget implements EvalNuggetInterface { RemoteNugget(final Supplier makeTable) { this.makeTable = makeTable; this.originalTable = (QueryTable) makeTable.get(); - this.barrageMessageProducer = originalTable.getResult(new BarrageMessageProducer.Operation<>(scheduler, + this.barrageMessageProducer = originalTable.getResult(new BarrageMessageProducer.Operation(scheduler, new SessionService.ObfuscatingErrorTransformer(), daggerRoot.getStreamGeneratorFactory(), originalTable, UPDATE_INTERVAL, this::onGetSnapshot)); @@ -1410,7 +1409,7 @@ public void createTable() { } } - public static class DummyObserver implements StreamObserver { + public static class DummyObserver implements StreamObserver { volatile boolean completed = false; private final BarrageDataMarshaller marshaller; @@ -1422,7 +1421,7 @@ public static class DummyObserver implements StreamObserver { try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos = From cae1551b12be838beeb426bcd41c99df8dc2d625 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 31 May 2024 20:02:47 -0500 Subject: [PATCH 07/13] Revised draft at minimizing use of stream adapters --- .../server/arrow/ArrowFlightUtil.java | 37 ++++++++++++++++-- .../deephaven/server/arrow/ArrowModule.java | 38 +------------------ .../barrage/BarrageMessageProducer.java | 4 +- 3 files changed, 37 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 12162bceda9..f25d5f3abb9 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -62,6 +62,38 @@ public class ArrowFlightUtil { private static final Logger log = LoggerFactory.getLogger(ArrowFlightUtil.class); + private static class MessageViewAdapter implements StreamObserver { + private final StreamObserver delegate; + + private MessageViewAdapter(StreamObserver delegate) { + this.delegate = delegate; + } + + public void onNext(BarrageStreamGenerator.MessageView value) { + synchronized (delegate) { + try { + value.forEachStream(delegate::onNext); + } catch (IOException e) { + throw new UncheckedDeephavenException(e); + } + } + } + + @Override + public void onError(Throwable t) { + synchronized (delegate) { + delegate.onError(t); + } + } + + @Override + public void onCompleted() { + synchronized (delegate) { + delegate.onCompleted(); + } + } + } + public static final int DEFAULT_MIN_UPDATE_INTERVAL_MS = Configuration.getInstance().getIntegerWithDefault("barrage.minUpdateInterval", 1000); @@ -105,7 +137,7 @@ public static void DoGetCustom( // create an adapter for the response observer final StreamObserver listener = - ArrowModule.provideListenerAdapter().adapt(observer); + new MessageViewAdapter(observer); // push the schema to the listener listener.onNext(streamGeneratorFactory.getSchemaView( @@ -355,7 +387,6 @@ public DoExchangeMarshaller( final BarrageStreamGenerator.Factory streamGeneratorFactory, final BarrageMessageProducer.Operation.Factory bmpOperationFactory, final HierarchicalTableViewSubscription.Factory htvsFactory, - final BarrageMessageProducer.Adapter, StreamObserver> listenerAdapter, final BarrageMessageProducer.Adapter subscriptionOptAdapter, final BarrageMessageProducer.Adapter snapshotOptAdapter, final SessionService.ErrorTransformer errorTransformer, @@ -370,7 +401,7 @@ public DoExchangeMarshaller( this.subscriptionOptAdapter = subscriptionOptAdapter; this.snapshotOptAdapter = snapshotOptAdapter; this.session = session; - this.listener = listenerAdapter.adapt(responseObserver); + this.listener = new MessageViewAdapter(responseObserver); this.errorTransformer = errorTransformer; this.session.addOnCloseCallback(this); diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java b/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java index 5bbd59dfd67..7f2b22aa464 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowModule.java @@ -7,7 +7,6 @@ import dagger.Module; import dagger.Provides; import dagger.multibindings.IntoSet; -import io.deephaven.UncheckedDeephavenException; import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest; import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; @@ -16,11 +15,8 @@ import io.deephaven.server.barrage.BarrageMessageProducer; import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl; import io.grpc.BindableService; -import io.grpc.stub.StreamObserver; import javax.inject.Singleton; -import java.io.IOException; -import java.io.InputStream; @Module public abstract class ArrowModule { @@ -32,44 +28,12 @@ public abstract class ArrowModule { @IntoSet abstract BindableService bindBrowserFlightServiceBinding(BrowserFlightServiceGrpcBinding service); - @Binds + @Provides @Singleton static BarrageStreamGenerator.Factory bindStreamGenerator() { return new BarrageStreamGeneratorImpl.Factory(); } - - // TODO before commit, try getting rid of this - @Provides - static BarrageMessageProducer.Adapter, StreamObserver> provideListenerAdapter() { - return delegate -> new StreamObserver<>() { - @Override - public void onNext(final BarrageStreamGenerator.MessageView view) { - try { - synchronized (delegate) { - view.forEachStream(delegate::onNext); - } - } catch (final IOException ioe) { - throw new UncheckedDeephavenException(ioe); - } - } - - @Override - public void onError(Throwable t) { - synchronized (delegate) { - delegate.onError(t); - } - } - - @Override - public void onCompleted() { - synchronized (delegate) { - delegate.onCompleted(); - } - } - }; - } - @Provides static BarrageMessageProducer.Adapter subscriptionOptAdapter() { return BarrageSubscriptionOptions::of; diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index 518ed1ab699..68a138a78b1 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -516,7 +516,7 @@ public void addSubscription(final StreamObserver listener, + private boolean findAndUpdateSubscription(final StreamObserver listener, final Consumer updateSubscription) { final Function, Boolean> findAndUpdate = (List subscriptions) -> { for (final Subscription sub : subscriptions) { @@ -581,7 +581,7 @@ public boolean updateSubscription(final StreamObserver listener) { + public void removeSubscription(final StreamObserver listener) { findAndUpdateSubscription(listener, sub -> { sub.pendingDelete = true; if (log.isDebugEnabled()) { From 46f5eba19ef9af04e1e6b78a9e57b55ca5d6e8de Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 31 May 2024 16:14:30 -0500 Subject: [PATCH 08/13] Tighten BSG/I's usage of InputStream to DefensiveDrainable --- .../barrage/BarrageStreamGenerator.java | 4 +- .../barrage/BarrageStreamGeneratorImpl.java | 44 +++++++++---------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java index 730f8700781..2d3b55fe7fd 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java @@ -6,12 +6,12 @@ import com.google.flatbuffers.FlatBufferBuilder; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.impl.util.BarrageMessage; +import io.deephaven.extensions.barrage.util.DefensiveDrainable; import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.io.IOException; -import java.io.InputStream; import java.util.BitSet; import java.util.function.Consumer; import java.util.function.ToIntFunction; @@ -23,7 +23,7 @@ public interface BarrageStreamGenerator extends SafeCloseable { interface MessageView { - void forEachStream(Consumer visitor) throws IOException; + void forEachStream(Consumer visitor) throws IOException; } interface Factory { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 34162391125..7a8518c0b80 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -47,7 +47,6 @@ import org.jetbrains.annotations.Nullable; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.*; @@ -310,7 +309,7 @@ public SubView(final BarrageSubscriptionOptions options, } @Override - public void forEachStream(Consumer visitor) throws IOException { + public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); ByteBuffer metadata = getSubscriptionMetadata(); MutableLong bytesWritten = new MutableLong(0L); @@ -322,7 +321,7 @@ public void forEachStream(Consumer visitor) throws IOException { if (numAddRows == 0 && numModRows == 0) { // we still need to send a message containing metadata when there are no rows - final InputStream is = getInputStream(this, 0, 0, actualBatchSize, metadata, + final DefensiveDrainable is = getInputStream(this, 0, 0, actualBatchSize, metadata, BarrageStreamGeneratorImpl.this::appendAddColumns); bytesWritten.add(is.available()); visitor.accept(is); @@ -522,7 +521,7 @@ public SnapshotView(final BarrageSnapshotOptions options, } @Override - public void forEachStream(Consumer visitor) throws IOException { + public void forEachStream(Consumer visitor) throws IOException { final long startTm = System.nanoTime(); ByteBuffer metadata = getSnapshotMetadata(); MutableLong bytesWritten = new MutableLong(0L); @@ -636,7 +635,7 @@ public SchemaMessageView(final ByteBuffer buffer) { } @Override - public void forEachStream(Consumer visitor) { + public void forEachStream(Consumer visitor) { visitor.accept(new DrainableByteArrayInputStream(msgBytes, 0, msgBytes.length)); } } @@ -644,7 +643,7 @@ public void forEachStream(Consumer visitor) { @FunctionalInterface private interface ColumnVisitor { int visit(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, - final Consumer addStream, + final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException; } @@ -661,13 +660,14 @@ int visit(final RecordBatchMessageView view, final long startRange, final int ta * @param columnVisitor the helper method responsible for appending the payload columns to the RecordBatch * @return an InputStream ready to be drained by GRPC */ - private InputStream getInputStream(final RecordBatchMessageView view, final long offset, final int targetBatchSize, + private DefensiveDrainable getInputStream(final RecordBatchMessageView view, final long offset, + final int targetBatchSize, final MutableInt actualBatchSize, final ByteBuffer metadata, final ColumnVisitor columnVisitor) throws IOException { - final ArrayDeque streams = new ArrayDeque<>(); + final ArrayDeque streams = new ArrayDeque<>(); final MutableInt size = new MutableInt(); - final Consumer addStream = (final InputStream is) -> { + final Consumer addStream = (final DefensiveDrainable is) -> { try { final int sz = is.available(); if (sz == 0) { @@ -755,7 +755,7 @@ private InputStream getInputStream(final RecordBatchMessageView view, final long writeHeader(metadata, size, header, baos); streams.addFirst(new DrainableByteArrayInputStream(baos.peekBuffer(), 0, baos.size())); - return new ConsecutiveDrainableStreams(streams.toArray(new InputStream[0])); + return new ConsecutiveDrainableStreams(streams.toArray(new DefensiveDrainable[0])); } catch (final IOException ex) { throw new UncheckedDeephavenException("Unexpected IOException", ex); } @@ -781,7 +781,7 @@ protected void writeHeader( cos.flush(); } - private void processBatches(Consumer visitor, final RecordBatchMessageView view, + private void processBatches(Consumer visitor, final RecordBatchMessageView view, final long numRows, final int maxBatchSize, ByteBuffer metadata, final ColumnVisitor columnVisitor, final MutableLong bytesWritten) throws IOException { long offset = 0; @@ -798,7 +798,7 @@ private void processBatches(Consumer visitor, final RecordBatchMess while (offset < numRows) { try { - final InputStream is = + final DefensiveDrainable is = getInputStream(view, offset, batchSize, actualBatchSize, metadata, columnVisitor); int bytesToWrite = is.available(); @@ -875,7 +875,8 @@ private static int findGeneratorForOffset(final List } private int appendAddColumns(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, - final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, + final Consumer addStream, + final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { if (addColumnData.length == 0) { return view.addRowOffsets().intSize(); @@ -941,7 +942,7 @@ private int appendAddColumns(final RecordBatchMessageView view, final long start } private int appendModColumns(final RecordBatchMessageView view, final long startRange, final int targetBatchSize, - final Consumer addStream, + final Consumer addStream, final ChunkInputStreamGenerator.FieldNodeListener fieldNodeListener, final ChunkInputStreamGenerator.BufferListener bufferListener) throws IOException { int[] columnChunkIdx = new int[modColumnData.length]; @@ -1203,21 +1204,16 @@ public int drainTo(final OutputStream outputStream) throws IOException { } public static class ConsecutiveDrainableStreams extends DefensiveDrainable { - final InputStream[] streams; + final DefensiveDrainable[] streams; - public ConsecutiveDrainableStreams(final InputStream... streams) { + public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) { this.streams = streams; - for (final InputStream stream : streams) { - if (!(stream instanceof Drainable)) { - throw new IllegalArgumentException("expecting sub-class of Drainable; found: " + stream.getClass()); - } - } } @Override public int drainTo(final OutputStream outputStream) throws IOException { int total = 0; - for (final InputStream stream : streams) { + for (final DefensiveDrainable stream : streams) { final int expected = total + stream.available(); total += ((Drainable) stream).drainTo(outputStream); if (expected != total) { @@ -1233,7 +1229,7 @@ public int drainTo(final OutputStream outputStream) throws IOException { @Override public int available() throws SizeException, IOException { int total = 0; - for (final InputStream stream : streams) { + for (final DefensiveDrainable stream : streams) { total += stream.available(); if (total < 0) { throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); @@ -1244,7 +1240,7 @@ public int available() throws SizeException, IOException { @Override public void close() throws IOException { - for (final InputStream stream : streams) { + for (final DefensiveDrainable stream : streams) { try { stream.close(); } catch (final IOException e) { From bc28009bf7f8f2267aa0bdcc09365c48aa580337 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 31 May 2024 16:15:34 -0500 Subject: [PATCH 09/13] Move general util classes to their own types --- .../barrage/BarrageStreamGeneratorImpl.java | 82 ------------------- .../barrage/ConsecutiveDrainableStreams.java | 56 +++++++++++++ .../DrainableByteArrayInputStream.java | 44 ++++++++++ .../barrage/BarrageStreamGeneratorTest.java | 15 ++-- 4 files changed, 107 insertions(+), 90 deletions(-) create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 7a8518c0b80..b5deafc6c67 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -1169,88 +1169,6 @@ public RowSetShiftDataGenerator(final RowSetShiftData shifted) throws IOExceptio } } - public static class DrainableByteArrayInputStream extends DefensiveDrainable { - - private byte[] buf; - private final int offset; - private final int length; - - public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) { - this.buf = Objects.requireNonNull(buf); - this.offset = offset; - this.length = length; - } - - @Override - public int available() { - if (buf == null) { - return 0; - } - return length; - } - - @Override - public int drainTo(final OutputStream outputStream) throws IOException { - if (buf != null) { - try { - outputStream.write(buf, offset, length); - } finally { - buf = null; - } - return length; - } - return 0; - } - } - - public static class ConsecutiveDrainableStreams extends DefensiveDrainable { - final DefensiveDrainable[] streams; - - public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) { - this.streams = streams; - } - - @Override - public int drainTo(final OutputStream outputStream) throws IOException { - int total = 0; - for (final DefensiveDrainable stream : streams) { - final int expected = total + stream.available(); - total += ((Drainable) stream).drainTo(outputStream); - if (expected != total) { - throw new IllegalStateException("drained message drained wrong number of bytes"); - } - if (total < 0) { - throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE"); - } - } - return total; - } - - @Override - public int available() throws SizeException, IOException { - int total = 0; - for (final DefensiveDrainable stream : streams) { - total += stream.available(); - if (total < 0) { - throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); - } - } - return total; - } - - @Override - public void close() throws IOException { - for (final DefensiveDrainable stream : streams) { - try { - stream.close(); - } catch (final IOException e) { - throw new UncheckedDeephavenException("unexpected IOException", e); - } - } - super.close(); - } - } - private static final class EmptyRowSetGenerator extends RowSetGenerator { public static final EmptyRowSetGenerator INSTANCE; static { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java new file mode 100644 index 00000000000..441356255d7 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java @@ -0,0 +1,56 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.UncheckedDeephavenException; +import io.deephaven.extensions.barrage.util.DefensiveDrainable; +import io.deephaven.util.SafeCloseable; +import io.deephaven.util.datastructures.SizeException; +import io.grpc.Drainable; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; + +public class ConsecutiveDrainableStreams extends DefensiveDrainable { + final DefensiveDrainable[] streams; + + public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) { + this.streams = streams; + } + + @Override + public int drainTo(final OutputStream outputStream) throws IOException { + int total = 0; + for (final DefensiveDrainable stream : streams) { + final int expected = total + stream.available(); + total += stream.drainTo(outputStream); + if (expected != total) { + throw new IllegalStateException("drained message drained wrong number of bytes"); + } + if (total < 0) { + throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE"); + } + } + return total; + } + + @Override + public int available() throws SizeException, IOException { + int total = 0; + for (final DefensiveDrainable stream : streams) { + total += stream.available(); + if (total < 0) { + throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); + } + } + return total; + } + + @Override + public void close() throws IOException { + SafeCloseable.closeAll(streams); + super.close(); + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java new file mode 100644 index 00000000000..f2b14a7dc44 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java @@ -0,0 +1,44 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.extensions.barrage.util.DefensiveDrainable; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +public class DrainableByteArrayInputStream extends DefensiveDrainable { + + private byte[] buf; + private final int offset; + private final int length; + + public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) { + this.buf = Objects.requireNonNull(buf); + this.offset = offset; + this.length = length; + } + + @Override + public int available() { + if (buf == null) { + return 0; + } + return length; + } + + @Override + public int drainTo(final OutputStream outputStream) throws IOException { + if (buf != null) { + try { + outputStream.write(buf, offset, length); + } finally { + buf = null; + } + return length; + } + return 0; + } +} diff --git a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java index e6c59d7efb8..73be2b851af 100644 --- a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java +++ b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java @@ -14,8 +14,8 @@ public class BarrageStreamGeneratorTest { @Test public void testDrainableStreamIsEmptied() throws IOException { final int length = 512; - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream inputStream = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final DrainableByteArrayInputStream inputStream = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); int bytesRead = inputStream.drainTo(new NullOutputStream()); @@ -26,12 +26,11 @@ public void testDrainableStreamIsEmptied() throws IOException { @Test public void testConsecutiveDrainableStreamIsEmptied() throws IOException { final int length = 512; - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in1 = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in2 = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); - final BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams inputStream = - new BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams(in1, in2); + final DrainableByteArrayInputStream in1 = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final DrainableByteArrayInputStream in2 = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final ConsecutiveDrainableStreams inputStream = new ConsecutiveDrainableStreams(in1, in2); int bytesRead = inputStream.drainTo(new NullOutputStream()); From 6a23dcfc8b050a610b926180ee5c96630ff548c7 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 31 May 2024 16:18:08 -0500 Subject: [PATCH 10/13] Remove dead method --- .../barrage/BarrageStreamGeneratorImpl.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index b5deafc6c67..a295a65f86b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -38,7 +38,6 @@ import io.deephaven.util.datastructures.SizeException; import io.deephaven.util.mutable.MutableInt; import io.deephaven.util.mutable.MutableLong; -import io.grpc.Drainable; import org.apache.arrow.flatbuf.Buffer; import org.apache.arrow.flatbuf.FieldNode; import org.apache.arrow.flatbuf.RecordBatch; @@ -47,7 +46,6 @@ import org.jetbrains.annotations.Nullable; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.*; import java.util.function.Consumer; @@ -1111,25 +1109,12 @@ protected int addToFlatBuffer(final RowSet viewport, final FlatBufferBuilder bui } public static class BitSetGenerator extends ByteArrayGenerator { - public final BitSet original; - - public BitSetGenerator(final BitSet bitset) throws IOException { - this.original = bitset == null ? new BitSet() : bitset; + public BitSetGenerator(final BitSet bitset) { + BitSet original = bitset == null ? new BitSet() : bitset; this.raw = original.toByteArray(); final int nBits = original.previousSetBit(Integer.MAX_VALUE - 1) + 1; this.len = (int) ((long) nBits + 7) / 8; } - - public int addToFlatBuffer(final BitSet mine, final FlatBufferBuilder builder) throws IOException { - if (mine.equals(original)) { - return addToFlatBuffer(builder); - } - - final byte[] nraw = mine.toByteArray(); - final int nBits = mine.previousSetBit(Integer.MAX_VALUE - 1) + 1; - final int nlen = (int) ((long) nBits + 7) / 8; - return builder.createByteVector(nraw, 0, nlen); - } } public static class RowSetShiftDataGenerator extends ByteArrayGenerator { From a321d830069786d4c41ce03c27f38a771fae77ae Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 4 Jun 2024 10:31:15 -0500 Subject: [PATCH 11/13] Javadoc suggestions --- .../deephaven/extensions/barrage/BarrageStreamGenerator.java | 4 ++++ .../extensions/barrage/BarrageStreamGeneratorImpl.java | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java index 2d3b55fe7fd..2c0375235ae 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGenerator.java @@ -22,6 +22,10 @@ */ public interface BarrageStreamGenerator extends SafeCloseable { + /** + * Represents a single update, which might be sent as multiple distinct payloads as necessary based in the + * implementation. + */ interface MessageView { void forEachStream(Consumer visitor) throws IOException; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index a295a65f86b..badc257aa78 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -647,8 +647,9 @@ int visit(final RecordBatchMessageView view, final long startRange, final int ta } /** - * Returns an InputStream of the message filtered to the viewport. This function accepts `targetBatchSize` but may - * actually write fewer rows than the target (when crossing an internal chunk boundary, e.g.) + * Returns an InputStream of a single FlightData message filtered to the viewport (if provided). This function + * accepts {@code targetBatchSize}, but may actually write fewer rows than the target (e.g. when crossing an + * internal chunk boundary). * * @param view the view of the overall chunk to generate a RecordBatch for * @param offset the start of the batch in position space w.r.t. the view (inclusive) From 1bc1845affb0337888c310c16ff163524958d48e Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 6 Jun 2024 13:33:34 -0500 Subject: [PATCH 12/13] Revert fastpath change --- .../extensions/barrage/BarrageStreamGeneratorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index badc257aa78..9aa0c9376c5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -846,7 +846,7 @@ private void processBatches(Consumer visitor, final RecordBa private static int findGeneratorForOffset(final List generators, final long offset) { // fast path for smaller updates - if (generators.isEmpty()) { + if (generators.size() <= 1) { return 0; } From 3e85d30bb96d99b5c5e21fda5908288970d8521d Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 6 Jun 2024 14:28:23 -0500 Subject: [PATCH 13/13] Import and whitespace cleanup --- .../barrage/ChunkListInputStreamGenerator.java | 1 - .../extensions/barrage/ConsecutiveDrainableStreams.java | 2 -- .../deephaven/server/barrage/BarrageMessageProducer.java | 9 +++++---- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java index 7aeb20c8b9f..f64be56149f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java @@ -7,7 +7,6 @@ import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; import io.deephaven.extensions.barrage.util.StreamReaderOptions; import io.deephaven.util.SafeCloseable; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java index 441356255d7..d3ccc60912f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java @@ -3,11 +3,9 @@ // package io.deephaven.extensions.barrage; -import io.deephaven.UncheckedDeephavenException; import io.deephaven.extensions.barrage.util.DefensiveDrainable; import io.deephaven.util.SafeCloseable; import io.deephaven.util.datastructures.SizeException; -import io.grpc.Drainable; import org.jetbrains.annotations.NotNull; import java.io.IOException; diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index 68a138a78b1..ddc067f0a33 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -163,8 +163,7 @@ public MemoizedOperationKey getMemoizedOperationKey() { } @Override - public Result initialize(final boolean usePrev, - final long beforeClock) { + public Result initialize(final boolean usePrev, final long beforeClock) { final BarrageMessageProducer result = new BarrageMessageProducer(scheduler, errorTransformer, streamGeneratorFactory, parent, updateIntervalMs, onGetSnapshot); return new Result<>(result, result.constructListener()); @@ -550,9 +549,11 @@ public boolean updateSubscription(final StreamObserver listener, + public boolean updateSubscription( + final StreamObserver listener, @Nullable final RowSet newViewport, - @Nullable final BitSet columnsToSubscribe, final boolean newReverseViewport) { + @Nullable final BitSet columnsToSubscribe, + final boolean newReverseViewport) { return findAndUpdateSubscription(listener, sub -> { if (sub.pendingViewport != null) { sub.pendingViewport.close();