Skip to content

Commit

Permalink
Drainable.drainTo must not close the outputstream it gets
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Nov 23, 2021
1 parent 95154b8 commit 8e7c92b
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,34 @@ public int drainTo(final OutputStream outputStream) throws IOException {

long bytesWritten = 0;
read = true;
try (final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream)) {
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_BYTE) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}
});
if (context.count > 0) {
final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream);
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_BYTE) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
});
if (context.count > 0) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
}

// write the included values
subset.forAllLongs(row -> {
try {
Expand All @@ -130,9 +130,8 @@ public int drainTo(final OutputStream outputStream) throws IOException {
final long bytesExtended = bytesWritten & REMAINDER_MOD_8_MASK;
if (bytesExtended > 0) {
bytesWritten += 8 - bytesExtended;
dos.write(PADDING_BUFFER, 0, (int)(8 - bytesExtended));
dos.write(PADDING_BUFFER, 0, (int) (8 - bytesExtended));
}
}
return LongSizedDataStructure.intSize("ByteChunkInputStreamGenerator", bytesWritten);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,34 @@ public int drainTo(final OutputStream outputStream) throws IOException {

long bytesWritten = 0;
read = true;
try (final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream)) {
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_CHAR) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}
});
if (context.count > 0) {
final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream);
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_CHAR) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
});
if (context.count > 0) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
}

// write the included values
subset.forAllLongs(row -> {
try {
Expand All @@ -127,9 +127,8 @@ public int drainTo(final OutputStream outputStream) throws IOException {
final long bytesExtended = bytesWritten & REMAINDER_MOD_8_MASK;
if (bytesExtended > 0) {
bytesWritten += 8 - bytesExtended;
dos.write(PADDING_BUFFER, 0, (int)(8 - bytesExtended));
dos.write(PADDING_BUFFER, 0, (int) (8 - bytesExtended));
}
}
return LongSizedDataStructure.intSize("CharChunkInputStreamGenerator", bytesWritten);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,34 @@ public int drainTo(final OutputStream outputStream) throws IOException {

long bytesWritten = 0;
read = true;
try (final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream)) {
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_DOUBLE) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}
});
if (context.count > 0) {
final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream);
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_DOUBLE) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
});
if (context.count > 0) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
}

// write the included values
subset.forAllLongs(row -> {
try {
Expand All @@ -130,9 +130,8 @@ public int drainTo(final OutputStream outputStream) throws IOException {
final long bytesExtended = bytesWritten & REMAINDER_MOD_8_MASK;
if (bytesExtended > 0) {
bytesWritten += 8 - bytesExtended;
dos.write(PADDING_BUFFER, 0, (int)(8 - bytesExtended));
dos.write(PADDING_BUFFER, 0, (int) (8 - bytesExtended));
}
}
return LongSizedDataStructure.intSize("DoubleChunkInputStreamGenerator", bytesWritten);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,34 @@ public int drainTo(final OutputStream outputStream) throws IOException {

long bytesWritten = 0;
read = true;
try (final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream)) {
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_FLOAT) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}
});
if (context.count > 0) {
final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream);
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_FLOAT) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
});
if (context.count > 0) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
}

// write the included values
subset.forAllLongs(row -> {
try {
Expand All @@ -130,9 +130,8 @@ public int drainTo(final OutputStream outputStream) throws IOException {
final long bytesExtended = bytesWritten & REMAINDER_MOD_8_MASK;
if (bytesExtended > 0) {
bytesWritten += 8 - bytesExtended;
dos.write(PADDING_BUFFER, 0, (int)(8 - bytesExtended));
dos.write(PADDING_BUFFER, 0, (int) (8 - bytesExtended));
}
}
return LongSizedDataStructure.intSize("FloatChunkInputStreamGenerator", bytesWritten);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,34 @@ public int drainTo(final OutputStream outputStream) throws IOException {

long bytesWritten = 0;
read = true;
try (final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream)) {
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_INT) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}
});
if (context.count > 0) {
final LittleEndianDataOutputStream dos = new LittleEndianDataOutputStream(outputStream);
// write the validity array with LSB indexing
if (sendValidityBuffer()) {
final SerContext context = new SerContext();
final Runnable flush = () -> {
try {
dos.writeLong(context.accumulator);
} catch (final IOException e) {
throw new UncheckedDeephavenException("Unexpected exception while draining data to OutputStream: ", e);
}
context.accumulator = 0;
context.count = 0;
};
subset.forAllLongs(row -> {
if (chunk.get((int) row) != NULL_INT) {
context.accumulator |= 1L << context.count;
}
if (++context.count == 64) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
});
if (context.count > 0) {
flush.run();
}

bytesWritten += getValidityMapSerializationSizeFor(subset.intSize());
}

// write the included values
subset.forAllLongs(row -> {
try {
Expand All @@ -130,9 +130,8 @@ public int drainTo(final OutputStream outputStream) throws IOException {
final long bytesExtended = bytesWritten & REMAINDER_MOD_8_MASK;
if (bytesExtended > 0) {
bytesWritten += 8 - bytesExtended;
dos.write(PADDING_BUFFER, 0, (int)(8 - bytesExtended));
dos.write(PADDING_BUFFER, 0, (int) (8 - bytesExtended));
}
}
return LongSizedDataStructure.intSize("IntChunkInputStreamGenerator", bytesWritten);
}
}
Expand Down
Loading

0 comments on commit 8e7c92b

Please sign in to comment.