Skip to content

Commit 5b11b9f

Browse files
committed
HADOOP-15446. WASB: PageBlobInputStream.skip breaks HBASE replication.
Contributed by Thomas Marquardt
1 parent 67f239c commit 5b11b9f

File tree

2 files changed

+605
-45
lines changed

2 files changed

+605
-45
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java

Lines changed: 78 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
2626

2727
import java.io.ByteArrayOutputStream;
28+
import java.io.EOFException;
2829
import java.io.IOException;
2930
import java.io.InputStream;
3031
import java.util.ArrayList;
3132

3233
import org.apache.commons.logging.Log;
3334
import org.apache.commons.logging.LogFactory;
35+
import org.apache.hadoop.fs.FSExceptionMessages;
3436
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
3537

3638
import com.microsoft.azure.storage.OperationContext;
@@ -58,7 +60,9 @@ final class PageBlobInputStream extends InputStream {
5860
// The buffer holding the current data we last read from the server.
5961
private byte[] currentBuffer;
6062
// The current byte offset we're at in the buffer.
61-
private int currentOffsetInBuffer;
63+
private int currentBufferOffset;
64+
// The current buffer length
65+
private int currentBufferLength;
6266
// Maximum number of pages to get per any one request.
6367
private static final int MAX_PAGES_PER_DOWNLOAD =
6468
4 * 1024 * 1024 / PAGE_SIZE;
@@ -174,7 +178,7 @@ public synchronized void close() throws IOException {
174178

175179
private boolean dataAvailableInBuffer() {
176180
return currentBuffer != null
177-
&& currentOffsetInBuffer < currentBuffer.length;
181+
&& currentBufferOffset < currentBufferLength;
178182
}
179183

180184
/**
@@ -194,6 +198,8 @@ private synchronized boolean ensureDataInBuffer() throws IOException {
194198
return true;
195199
}
196200
currentBuffer = null;
201+
currentBufferOffset = 0;
202+
currentBufferLength = 0;
197203
if (numberOfPagesRemaining == 0) {
198204
// No more data to read.
199205
return false;
@@ -209,43 +215,48 @@ private synchronized boolean ensureDataInBuffer() throws IOException {
209215
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
210216
blob.downloadRange(currentOffsetInBlob, bufferSize, baos,
211217
withMD5Checking(), opContext);
212-
currentBuffer = baos.toByteArray();
218+
validateDataIntegrity(baos.toByteArray());
213219
} catch (StorageException e) {
214220
throw new IOException(e);
215221
}
216222
numberOfPagesRemaining -= pagesToRead;
217223
currentOffsetInBlob += bufferSize;
218-
currentOffsetInBuffer = PAGE_HEADER_SIZE;
219-
220-
// Since we just downloaded a new buffer, validate its consistency.
221-
validateCurrentBufferConsistency();
222224

223225
return true;
224226
}
225227

226-
private void validateCurrentBufferConsistency()
228+
private void validateDataIntegrity(byte[] buffer)
227229
throws IOException {
228-
if (currentBuffer.length % PAGE_SIZE != 0) {
230+
231+
if (buffer.length % PAGE_SIZE != 0) {
229232
throw new AssertionError("Unexpected buffer size: "
230-
+ currentBuffer.length);
233+
+ buffer.length);
231234
}
232-
int numberOfPages = currentBuffer.length / PAGE_SIZE;
235+
236+
int bufferLength = 0;
237+
int numberOfPages = buffer.length / PAGE_SIZE;
238+
long totalPagesAfterCurrent = numberOfPagesRemaining;
239+
233240
for (int page = 0; page < numberOfPages; page++) {
234-
short currentPageSize = getPageSize(blob, currentBuffer,
235-
page * PAGE_SIZE);
236-
// Calculate the number of pages that exist after this one
237-
// in the blob.
238-
long totalPagesAfterCurrent =
239-
(numberOfPages - page - 1) + numberOfPagesRemaining;
240-
// Only the last page is allowed to be not filled completely.
241-
if (currentPageSize < PAGE_DATA_SIZE
241+
// Calculate the number of pages that exist in the blob after this one
242+
totalPagesAfterCurrent--;
243+
244+
short currentPageSize = getPageSize(blob, buffer, page * PAGE_SIZE);
245+
246+
// Only the last page can be partially filled.
247+
if (currentPageSize < PAGE_DATA_SIZE
242248
&& totalPagesAfterCurrent > 0) {
243249
throw fileCorruptException(blob, String.format(
244-
"Page with partial data found in the middle (%d pages from the"
245-
+ " end) that only has %d bytes of data.",
246-
totalPagesAfterCurrent, currentPageSize));
250+
"Page with partial data found in the middle (%d pages from the"
251+
+ " end) that only has %d bytes of data.",
252+
totalPagesAfterCurrent, currentPageSize));
247253
}
254+
bufferLength += currentPageSize + PAGE_HEADER_SIZE;
248255
}
256+
257+
currentBufferOffset = PAGE_HEADER_SIZE;
258+
currentBufferLength = bufferLength;
259+
currentBuffer = buffer;
249260
}
250261

251262
// Reads the page size from the page header at the given offset.
@@ -275,7 +286,7 @@ public synchronized int read(byte[] outputBuffer, int offset, int len)
275286
}
276287
int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
277288
int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
278-
System.arraycopy(currentBuffer, currentOffsetInBuffer, outputBuffer,
289+
System.arraycopy(currentBuffer, currentBufferOffset, outputBuffer,
279290
offset, numBytesToRead);
280291
numberOfBytesRead += numBytesToRead;
281292
offset += numBytesToRead;
@@ -284,7 +295,7 @@ public synchronized int read(byte[] outputBuffer, int offset, int len)
284295
// We've finished this page, move on to the next.
285296
advancePagesInBuffer(1);
286297
} else {
287-
currentOffsetInBuffer += numBytesToRead;
298+
currentBufferOffset += numBytesToRead;
288299
}
289300
}
290301

@@ -309,9 +320,26 @@ public int read() throws IOException {
309320
}
310321

311322
/**
312-
* Skips over and discards n bytes of data from this input stream.
313-
* @param n the number of bytes to be skipped.
314-
* @return the actual number of bytes skipped.
323+
* Skips over and discards <code>n</code> bytes of data from this input
324+
* stream. The <code>skip</code> method may, for a variety of reasons, end
325+
* up skipping over some smaller number of bytes, possibly <code>0</code>.
326+
* This may result from any of a number of conditions; reaching end of file
327+
* before <code>n</code> bytes have been skipped is only one possibility.
328+
* The actual number of bytes skipped is returned. If {@code n} is
329+
* negative, the {@code skip} method for class {@code InputStream} always
330+
* returns 0, and no bytes are skipped. Subclasses may handle the negative
331+
* value differently.
332+
*
333+
* <p> The <code>skip</code> method of this class creates a
334+
* byte array and then repeatedly reads into it until <code>n</code> bytes
335+
* have been read or the end of the stream has been reached. Subclasses are
336+
* encouraged to provide a more efficient implementation of this method.
337+
* For instance, the implementation may depend on the ability to seek.
338+
*
339+
* @param n the number of bytes to be skipped.
340+
* @return the actual number of bytes skipped.
341+
* @exception IOException if the stream does not support seek,
342+
* or if some other I/O error occurs.
315343
*/
316344
@Override
317345
public synchronized long skip(long n) throws IOException {
@@ -338,18 +366,23 @@ private long skipImpl(long n) throws IOException {
338366
n -= skippedWithinBuffer;
339367
long skipped = skippedWithinBuffer;
340368

341-
// Empty the current buffer, we're going beyond it.
342-
currentBuffer = null;
369+
if (n == 0) {
370+
return skipped;
371+
}
372+
373+
if (numberOfPagesRemaining == 0) {
374+
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
375+
} else if (numberOfPagesRemaining > 1) {
376+
// skip over as many pages as we can, but we must read the last
377+
// page as it may not be full
378+
long pagesToSkipOver = Math.min(n / PAGE_DATA_SIZE,
379+
numberOfPagesRemaining - 1);
380+
numberOfPagesRemaining -= pagesToSkipOver;
381+
currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
382+
skipped += pagesToSkipOver * PAGE_DATA_SIZE;
383+
n -= pagesToSkipOver * PAGE_DATA_SIZE;
384+
}
343385

344-
// Skip over whole pages as necessary without retrieving them from the
345-
// server.
346-
long pagesToSkipOver = Math.max(0, Math.min(
347-
n / PAGE_DATA_SIZE,
348-
numberOfPagesRemaining - 1));
349-
numberOfPagesRemaining -= pagesToSkipOver;
350-
currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
351-
skipped += pagesToSkipOver * PAGE_DATA_SIZE;
352-
n -= pagesToSkipOver * PAGE_DATA_SIZE;
353386
if (n == 0) {
354387
return skipped;
355388
}
@@ -387,14 +420,14 @@ private long skipWithinBuffer(long n) throws IOException {
387420

388421
// Calculate how many whole pages (pages before the possibly partially
389422
// filled last page) remain.
390-
int currentPageIndex = currentOffsetInBuffer / PAGE_SIZE;
423+
int currentPageIndex = currentBufferOffset / PAGE_SIZE;
391424
int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE;
392425
int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1;
393426

394427
if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) {
395428
// I'm within one of the whole pages remaining, skip in there.
396429
advancePagesInBuffer((int) (n / PAGE_DATA_SIZE));
397-
currentOffsetInBuffer += n % PAGE_DATA_SIZE;
430+
currentBufferOffset += n % PAGE_DATA_SIZE;
398431
return n + skipped;
399432
}
400433

@@ -417,8 +450,8 @@ private long skipWithinBuffer(long n) throws IOException {
417450
*/
418451
private long skipWithinCurrentPage(long n) throws IOException {
419452
int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage();
420-
if (n < remainingBytesInCurrentPage) {
421-
currentOffsetInBuffer += n;
453+
if (n <= remainingBytesInCurrentPage) {
454+
currentBufferOffset += n;
422455
return n;
423456
} else {
424457
advancePagesInBuffer(1);
@@ -438,7 +471,7 @@ private int getBytesRemainingInCurrentPage() throws IOException {
438471
// Calculate our current position relative to the start of the current
439472
// page.
440473
int currentDataOffsetInPage =
441-
(currentOffsetInBuffer % PAGE_SIZE) - PAGE_HEADER_SIZE;
474+
(currentBufferOffset % PAGE_SIZE) - PAGE_HEADER_SIZE;
442475
int pageBoundary = getCurrentPageStartInBuffer();
443476
// Get the data size of the current page from the header.
444477
short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary);
@@ -454,14 +487,14 @@ private static IOException badStartRangeException(CloudPageBlobWrapper blob,
454487
}
455488

456489
private void advancePagesInBuffer(int numberOfPages) {
457-
currentOffsetInBuffer =
490+
currentBufferOffset =
458491
getCurrentPageStartInBuffer()
459492
+ (numberOfPages * PAGE_SIZE)
460493
+ PAGE_HEADER_SIZE;
461494
}
462495

463496
private int getCurrentPageStartInBuffer() {
464-
return PAGE_SIZE * (currentOffsetInBuffer / PAGE_SIZE);
497+
return PAGE_SIZE * (currentBufferOffset / PAGE_SIZE);
465498
}
466499

467500
private static IOException fileCorruptException(CloudPageBlobWrapper blob,

0 commit comments

Comments
 (0)