Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,59 @@ Return the data at the current position.
else
result = -1

### <a name="InputStream.available"></a> `InputStream.available()`

Returns the number of bytes "estimated" to be readable on a stream before `read()`
blocks on any IO (i.e. the thread is potentially suspended for some time).

That is: for all values `v` returned by `available()`, `read(buffer, 0, v)`
is should not block.

#### Postconditions

```python
if len(data) == 0:
result = 0

elif pos >= len(data):
result = 0

else:
d = "the amount of data known to be already buffered/cached locally"
result = min(1, d) # optional but recommended: see below.
```

As `0` is a number which is always meets this condition, it is nominally
possible for an implementation to simply return `0`. However, this is not
considered useful, and some applications/libraries expect a positive number.

#### The GZip problem.

[JDK-7036144](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144),
"GZIPInputStream readTrailer uses faulty available() test for end-of-stream"
discusses how the JDK's GZip code it uses `available()` to detect an EOF,
in a loop similar to the the following

```java
while(instream.available()) {
process(instream.read());
}
```

The correct loop would have been:

```java
int r;
while((r=instream.read()) >= 0) {
process(r);
}
```

If `available()` ever returns 0, then the gzip loop halts prematurely.

For this reason, implementations *should* return a value &gt;=1, even
if it breaks that requirement of `available()` returning the amount guaranteed
not to block on reads.

### <a name="InputStream.read.buffer[]"></a> `InputStream.read(buffer[], offset, length)`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.util.Random;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
Expand Down Expand Up @@ -99,14 +100,18 @@ public void testSeekZeroByteFile() throws Throwable {
describe("seek and read a 0 byte file");
instream = getFileSystem().open(zeroByteFile);
assertEquals(0, instream.getPos());
assertAvailableIsZero(instream);
//expect initial read to fai;
int result = instream.read();
assertMinusOne("initial byte read", result);
assertAvailableIsZero(instream);
byte[] buffer = new byte[1];
//expect that seek to 0 works
instream.seek(0);
assertAvailableIsZero(instream);
//reread, expect same exception
result = instream.read();
assertAvailableIsZero(instream);
assertMinusOne("post-seek byte read", result);
result = instream.read(buffer, 0, 1);
assertMinusOne("post-seek buffer read", result);
Expand All @@ -132,8 +137,8 @@ public void testBlockReadZeroByteFile() throws Throwable {
@Test
public void testSeekReadClosedFile() throws Throwable {
instream = getFileSystem().open(smallSeekFile);
getLogger().debug(
"Stream is of type " + instream.getClass().getCanonicalName());
getLogger().debug("Stream is of type {}",
instream.getClass().getCanonicalName());
instream.close();
try {
instream.seek(0);
Expand Down Expand Up @@ -168,10 +173,26 @@ public void testSeekReadClosedFile() throws Throwable {
try {
long offset = instream.getPos();
} catch (IOException e) {
// its valid to raise error here; but the test is applied to make
// it is valid to raise error here; but the test is applied to make
// sure there's no other exception like an NPE.

}
// a closed stream should either fail or return 0 bytes.
try {
int a = instream.available();
LOG.info("available() returns a value on a closed file: {}", a);
assertAvailableIsZero(instream);
} catch (IOException | IllegalStateException expected) {
// expected
}
// a closed stream should either fail or return 0 bytes.
try {
int a = instream.available();
LOG.info("available() returns a value on a closed file: {}", a);
assertAvailableIsZero(instream);
} catch (IOException | IllegalStateException expected) {
// expected
}
//and close again
instream.close();
}
Expand Down Expand Up @@ -205,6 +226,7 @@ public void testSeekFile() throws Throwable {
//expect that seek to 0 works
instream.seek(0);
int result = instream.read();
assertAvailableIsPositive(instream);
assertEquals(0, result);
assertEquals(1, instream.read());
assertEquals(2, instream.getPos());
Expand All @@ -226,13 +248,24 @@ public void testSeekAndReadPastEndOfFile() throws Throwable {
//go just before the end
instream.seek(TEST_FILE_LEN - 2);
assertTrue("Premature EOF", instream.read() != -1);
assertAvailableIsPositive(instream);
assertTrue("Premature EOF", instream.read() != -1);
checkAvailabilityAtEOF();
assertMinusOne("read past end of file", instream.read());
}

/**
* This can be overridden if a filesystem always returns 01
* @throws IOException
*/
protected void checkAvailabilityAtEOF() throws IOException {
assertAvailableIsZero(instream);
}

@Test
public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
describe("do a seek past the EOF, then verify the stream recovers");
describe("do a seek past the EOF, " +
"then verify the stream recovers");
instream = getFileSystem().open(smallSeekFile);
//go just before the end. This may or may not fail; it may be delayed until the
//read
Expand Down Expand Up @@ -261,6 +294,7 @@ public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
//now go back and try to read from a valid point in the file
instream.seek(1);
assertTrue("Premature EOF", instream.read() != -1);
assertAvailableIsPositive(instream);
}

/**
Expand All @@ -278,6 +312,7 @@ public void testSeekBigFile() throws Throwable {
//expect that seek to 0 works
instream.seek(0);
int result = instream.read();
assertAvailableIsPositive(instream);
assertEquals(0, result);
assertEquals(1, instream.read());
assertEquals(2, instream.read());
Expand All @@ -296,6 +331,7 @@ public void testSeekBigFile() throws Throwable {
instream.seek(0);
assertEquals(0, instream.getPos());
instream.read();
assertAvailableIsPositive(instream);
assertEquals(1, instream.getPos());
byte[] buf = new byte[80 * 1024];
instream.readFully(1, buf, 0, buf.length);
Expand All @@ -314,14 +350,15 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
instream.seek(39999);
assertTrue(-1 != instream.read());
assertEquals(40000, instream.getPos());

assertAvailableIsPositive(instream);
int v = 256;
byte[] readBuffer = new byte[v];
assertEquals(v, instream.read(128, readBuffer, 0, v));
//have gone back
assertEquals(40000, instream.getPos());
//content is the same too
assertEquals("@40000", block[40000], (byte) instream.read());
assertAvailableIsPositive(instream);
//now verify the picked up data
for (int i = 0; i < 256; i++) {
assertEquals("@" + i, block[i + 128], readBuffer[i]);
Expand Down Expand Up @@ -376,6 +413,7 @@ public void testReadFullyZeroByteFile() throws Throwable {
assertEquals(0, instream.getPos());
byte[] buffer = new byte[1];
instream.readFully(0, buffer, 0, 0);
assertAvailableIsZero(instream);
assertEquals(0, instream.getPos());
// seek to 0 read 0 bytes from it
instream.seek(0);
Expand Down Expand Up @@ -551,7 +589,9 @@ public void testReadSmallFile() throws Throwable {
fail("Expected an exception, got " + r);
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) {
} catch (IOException
| IllegalArgumentException
| IndexOutOfBoundsException e) {
handleRelaxedException("read() with a negative position ",
"EOFException",
e);
Expand Down Expand Up @@ -587,6 +627,29 @@ public void testReadAtExactEOF() throws Throwable {
instream = getFileSystem().open(smallSeekFile);
instream.seek(TEST_FILE_LEN -1);
assertTrue("read at last byte", instream.read() > 0);
assertAvailableIsZero(instream);
assertEquals("read just past EOF", -1, instream.read());
}

/**
* Assert that the number of bytes available is zero.
* @param in input stream
*/
protected static void assertAvailableIsZero(FSDataInputStream in)
throws IOException {
assertEquals("stream.available() should be zero",
0, in.available());
}

/**
* Assert that the number of bytes available is greater than zero.
* @param in input stream
*/
protected static void assertAvailableIsPositive(FSDataInputStream in)
throws IOException {
int available = in.available();
assertTrue("stream.available() should be positive but is "
+ available,
available > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private synchronized void reopen(String reason, long targetPos, long length,
}

@Override
public synchronized long getPos() throws IOException {
public synchronized long getPos() {
return (nextReadPos < 0) ? 0 : nextReadPos;
}

Expand Down Expand Up @@ -620,15 +620,26 @@ public synchronized boolean resetConnection() throws IOException {
return isObjectStreamOpen();
}

/**
* Return the number of bytes available.
* If the inner stream is closed, the value is 1 for consistency
* with S3ObjectStream -and so address the GZip bug
* http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144 .
* If the stream is open, then it is the amount returned by the
* wrapped stream.
* @return a value greater than or equal to zero.
* @throws IOException IO failure.
*/
@Override
public synchronized int available() throws IOException {
checkNotClosed();

long remaining = remainingInFile();
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
if (contentLength == 0 || (nextReadPos >= contentLength)) {
return 0;
}
return (int)remaining;

return wrappedStream == null
? 1
: wrappedStream.available();
}

/**
Expand All @@ -637,8 +648,8 @@ public synchronized int available() throws IOException {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInFile() {
return this.contentLength - this.pos;
public synchronized long remainingInFile() throws IOException {
return contentLength - getPos();
}

/**
Expand All @@ -649,7 +660,7 @@ public synchronized long remainingInFile() {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInCurrentRequest() {
return this.contentRangeFinish - this.pos;
return contentRangeFinish - getPos();
}

@InterfaceAudience.Private
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
* which S3A Supports.
* @return a list of seek policies to test.
*/
@Parameterized.Parameters
@Parameterized.Parameters(name = "{0}-{1}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{INPUT_FADV_SEQUENTIAL, Default_JSSE},
Expand Down