Skip to content

Commit

Permalink
refactor: Barrage and Chunk cleanup preparing for JS API support (#5846)
Browse files Browse the repository at this point in the history
* Extracted ExposedByteArrayOutputStream to its own file
* Avoid String.format in a few places
* Extract Array.newInstance so the JS API can replace with a different call

Partial #188
Co-authored-by: Ryan Caudy <rcaudy@gmail.com>
  • Loading branch information
niloc132 authored Jul 31, 2024
1 parent 77f03ac commit 9960d00
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 42 deletions.
3 changes: 1 addition & 2 deletions engine/chunk/src/main/java/io/deephaven/chunk/Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ default void copyToBuffer(int srcOffset, @NotNull Buffer destBuffer, int destOff
default void checkChunkType(ChunkType expected) {
final ChunkType actual = getChunkType();
if (actual != expected) {
throw new IllegalArgumentException(
String.format("Expected chunk type '%s', but is '%s'.", expected, actual));
throw new IllegalArgumentException("Expected chunk type '" + expected + "', but is '" + actual + "'.");
}
}

Expand Down
3 changes: 1 addition & 2 deletions engine/chunk/src/main/java/io/deephaven/chunk/ChunkBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public final void internalSetSize(int newSize, long password) {
"DO NOT CALL THIS INTERNAL METHOD. Instead call WritableChunk.setSize()");
}
if (newSize < 0 || newSize > capacity) {
throw new IllegalArgumentException(
String.format("size %d is incompatible with capacity %d", newSize, capacity));
throw new IllegalArgumentException("size " + newSize + " is incompatible with capacity " + capacity);
}

this.size = newSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ public class ChunkHelpers {
static void checkSliceArgs(int size, int offset, int capacity) {
if (offset < 0 || offset > size || capacity < 0 || capacity > size - offset) {
throw new IllegalArgumentException(
String.format("New slice offset %d, capacity %d is incompatible with size %d",
offset, capacity, size));
"New slice offset " + offset + ", capacity " + capacity + " is incompatible with size " + size);
}
}

static void checkArrayArgs(int arrayLength, int offset, int capacity) {
if (offset < 0 || capacity < 0 || capacity > arrayLength - offset) {
throw new IllegalArgumentException(
String.format("offset %d, capacity %d is incompatible with array of length %d",
offset, capacity, arrayLength));
throw new IllegalArgumentException("offset " + offset + ", capacity " + capacity
+ " is incompatible with array of length " + arrayLength);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.SingleElementListHeaderInputStreamGenerator;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil.ExposedByteArrayOutputStream;
import io.deephaven.extensions.barrage.util.ExposedByteArrayOutputStream;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk.array;

import java.lang.reflect.Array;

/**
* This utility class exists to isolate wrappers around reflective calls that may need to have their implementations
* replaced when emulated in the JS API.
*/
class ArrayReflectUtil {
/**
* Allocate an array to hold {@code length} elements of type {@code componentType}. The standard (Java)
* implementation delegates to {@link Array#newInstance(Class, int)}.
*
* @param componentType the component type in the array to be created
* @param length the length of the array to create
* @return the created array
*/
static Object newInstance(Class<?> componentType, int length) {
return Array.newInstance(componentType, length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.util.datastructures.LongSizedDataStructure;

import java.lang.reflect.Array;

public class ObjectArrayExpansionKernel implements ArrayExpansionKernel {

private final Class<?> componentType;
Expand Down Expand Up @@ -82,7 +80,7 @@ public <T, A extends Any> WritableObjectChunk<T, A> contract(
int lenRead = 0;
for (int i = 0; i < itemsInBatch; ++i) {
final int rowLen = perElementLengthDest.get(i + 1) - perElementLengthDest.get(i);
final Object[] row = (Object[]) Array.newInstance(componentType, rowLen);
final Object[] row = (Object[]) ArrayReflectUtil.newInstance(componentType, rowLen);
if (rowLen != 0) {
typedSource.copyToArray(lenRead, row, 0, rowLen);
lenRead += rowLen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.jetbrains.annotations.NotNull;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -65,12 +64,6 @@ public static RowSet toRowSet(final ByteBuffer string) {
}
}

public static class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
public byte[] peekBuffer() {
return buf;
}
}

public static class ObjectInputStreamAdapter extends InputStream {

private int sizeRemaining;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ synchronized InputStream delegate() throws IOException {
if (delegate != null) {
return delegate;
}
final BarrageProtoUtil.ExposedByteArrayOutputStream out = new BarrageProtoUtil.ExposedByteArrayOutputStream();
final ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
final int size = in.drainTo(out);
in.close();
in = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.util;

import java.io.ByteArrayOutputStream;

public class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
public byte[] peekBuffer() {
return buf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl;
import io.grpc.Drainable;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
Expand Down Expand Up @@ -66,9 +65,8 @@ private static class ArrowBuilderObserver implements StreamObserver<BarrageStrea
public void onNext(final BarrageStreamGenerator.MessageView messageView) {
try {
messageView.forEachStream(inputStream -> {
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
new BarrageProtoUtil.ExposedByteArrayOutputStream()) {
((Drainable) inputStream).drainTo(baos);
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream()) {
inputStream.drainTo(baos);
batchMessages.add(baos.toByteArray());
inputStream.close();
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.WritableShortChunk;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.ExposedByteArrayOutputStream;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.qst.type.Type;
Expand Down Expand Up @@ -684,8 +684,7 @@ private static <T> void testRoundTripSerialization(
ChunkInputStreamGenerator generator = ChunkInputStreamGenerator.makeInputStreamGenerator(
chunkType, type, type.getComponentType(), srcData, 0)) {
// full sub logic
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
new BarrageProtoUtil.ExposedByteArrayOutputStream();
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final ChunkInputStreamGenerator.DrainableColumn column = generator.getInputStream(options, null)) {


Expand All @@ -705,8 +704,7 @@ private static <T> void testRoundTripSerialization(
}

// empty subset
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
new BarrageProtoUtil.ExposedByteArrayOutputStream();
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final ChunkInputStreamGenerator.DrainableColumn column =
generator.getInputStream(options, RowSetFactory.empty())) {

Expand All @@ -732,8 +730,7 @@ private static <T> void testRoundTripSerialization(
builder.appendKey(i);
}
}
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
new BarrageProtoUtil.ExposedByteArrayOutputStream();
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final RowSet subset = builder.build();
final ChunkInputStreamGenerator.DrainableColumn column =
generator.getInputStream(options, subset)) {
Expand All @@ -754,8 +751,7 @@ private static <T> void testRoundTripSerialization(
}

// test append to existing chunk logic
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
new BarrageProtoUtil.ExposedByteArrayOutputStream();
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
final ChunkInputStreamGenerator.DrainableColumn column =
generator.getInputStream(options, null)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.server.util;

import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.ExposedByteArrayOutputStream;
import io.grpc.MethodDescriptor;

import java.io.ByteArrayInputStream;
Expand All @@ -20,8 +20,7 @@ public InputStream stream(InputStream value) {

@Override
public InputStream parse(InputStream stream) {
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
new BarrageProtoUtil.ExposedByteArrayOutputStream()) {
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream()) {
final byte[] buffer = new byte[4096];
while (stream.available() > 0) {
int len = stream.read(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageStreamReader;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.ExposedByteArrayOutputStream;
import io.deephaven.server.arrow.ArrowModule;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.util.Scheduler;
Expand All @@ -40,7 +40,6 @@
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.util.mutable.MutableInt;
import io.grpc.Drainable;
import io.grpc.stub.StreamObserver;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableObject;
Expand Down Expand Up @@ -1424,9 +1423,8 @@ public static class DummyObserver implements StreamObserver<BarrageStreamGenerat
public void onNext(final BarrageStreamGenerator.MessageView messageView) {
try {
messageView.forEachStream(inputStream -> {
try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos =
new BarrageProtoUtil.ExposedByteArrayOutputStream()) {
((Drainable) inputStream).drainTo(baos);
try (final ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream()) {
inputStream.drainTo(baos);
inputStream.close();
final BarrageMessage message =
marshaller.parse(new ByteArrayInputStream(baos.peekBuffer(), 0, baos.size()));
Expand Down

0 comments on commit 9960d00

Please sign in to comment.