Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,27 @@
* use this functionality in both a Guava 11 environment and a Guava >14 environment.
*/
public final class LimitedInputStream extends FilterInputStream {
private final boolean closeWrappedStream;
private long left;
private long mark = -1;

public LimitedInputStream(InputStream in, long limit) {
this(in, limit, true);
}

/**
* Create a LimitedInputStream that will read {@code limit} bytes from {@code in}.
* <p>
* If {@code closeWrappedStream} is true, this will close {@code in} when it is closed.
* Otherwise, the stream is left open for reading its remaining content.
*
* @param in a {@link InputStream} to read from
* @param limit the number of bytes to read
* @param closeWrappedStream whether to close {@code in} when {@link #close} is called
*/
public LimitedInputStream(InputStream in, long limit, boolean closeWrappedStream) {
super(in);
this.closeWrappedStream = closeWrappedStream;
Preconditions.checkNotNull(in);
Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
left = limit;
Expand Down Expand Up @@ -102,4 +118,11 @@ public LimitedInputStream(InputStream in, long limit) {
left -= skipped;
return skipped;
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a blank line here?

public void close() throws IOException {
if (closeWrappedStream) {
super.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,19 @@ private long[] mergeSpillsWithFileStream(
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
if (partitionLengthInSpill > 0) {
InputStream partitionInputStream =
new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill);
if (compressionCodec != null) {
partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
InputStream partitionInputStream = null;
boolean innerThrewException = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little bit strange - can we change it to do try catch and at the end of try we close with the flag false, and in catch we close with the flag true and then eliminate the variable?

Copy link
Contributor Author

@rdblue rdblue Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the typical way to use Closeables.close in Java. The same pattern is used for the outer try/finally that is already there and is in [Guava's documentation](https://google.github.io/guava/releases/19.0/api/docs/com/google/common/io/Closeables.html#close%28java.io.Closeable, boolean%29).

I'm reluctant to change from the standard pattern for aesthetic reasons when finally is the correct control flow -- the stream should always be closed regardless of exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with @rdblue here; note that we use this Closables.close pattern elsewhere in Spark's Java code as well.

try {
partitionInputStream =
new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill, false);
if (compressionCodec != null) {
partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
}
ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
innerThrewException = false;
} finally {
Closeables.close(partitionInputStream, innerThrewException);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that it matters much, but what about also using the Utils.tryWithSafeFinally pattern here for some consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the usual equivalent for Java code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh duh it's Java. I have made that mistake about 10 times now

}
ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
}
}
mergedFileOutputStream.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ private object TorrentBroadcast extends Logging {
val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
Utils.tryWithSafeFinally {
serOut.writeObject[T](obj)
} {
serOut.close()
}
cbbos.toChunkedByteBuffer.getChunks()
}

Expand All @@ -246,8 +250,11 @@ private object TorrentBroadcast extends Logging {
val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
val ser = serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
val obj = Utils.tryWithSafeFinally {
serIn.readObject[T]()
} {
serIn.close()
}
obj
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.Utils

/**
* Custom serializer used for generic Avro records. If the user registers the schemas
Expand Down Expand Up @@ -72,8 +73,11 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
val bos = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(bos)
out.write(schema.toString.getBytes(StandardCharsets.UTF_8))
out.close()
Utils.tryWithSafeFinally {
out.write(schema.toString.getBytes(StandardCharsets.UTF_8))
} {
out.close()
}
bos.toByteArray
})

Expand All @@ -86,7 +90,12 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
schemaBytes.array(),
schemaBytes.arrayOffset() + schemaBytes.position(),
schemaBytes.remaining())
val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
val in = codec.compressedInputStream(bis)
val bytes = Utils.tryWithSafeFinally {
IOUtils.toByteArray(in)
} {
in.close()
}
new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8))
})

Expand Down