Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the potential race condition in the BlobStore readhandler #12123

Merged
merged 9 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
Expand Down Expand Up @@ -54,6 +55,13 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
private final DataInputStream dataStream;
private final ExecutorService executor;

enum State {
Opened,
Closed
}

private State state = null;

private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream,
ExecutorService executor) {
Expand All @@ -62,6 +70,7 @@ private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
this.inputStream = inputStream;
this.dataStream = new DataInputStream(inputStream);
this.executor = executor;
state = State.Opened;
}

@Override
Expand All @@ -81,6 +90,7 @@ public CompletableFuture<Void> closeAsync() {
try {
index.close();
inputStream.close();
state = State.Closed;
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
Expand All @@ -94,56 +104,73 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.submit(() -> {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better put it behind

if (firstEntry > lastEntry
                    || firstEntry < 0
                    || lastEntry > getLastAddConfirmed()) {
    promise.completeExceptionally(new BKException.BKIncorrectParameterException());
    return;
}

we can save the list object apply when the (firstEntry, lastEntry) check failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array list will not allocate space until there is an element added to it. And it is used in the catch, I think we can keep it.

try {
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
promise.completeExceptionally(new BKException.BKIncorrectParameterException());
return;
}
long entriesToRead = (lastEntry - firstEntry) + 1;
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
try {
while (entriesToRead > 0) {
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
}
long entryId = dataStream.readLong();

if (entryId == nextExpectedId) {
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
toWrite -= buf.writeBytes(dataStream, toWrite);
}
entriesToRead--;
nextExpectedId++;
} else if (entryId > nextExpectedId) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId < nextExpectedId
&& !index.getIndexEntryForEntry(nextExpectedId).equals(
index.getIndexEntryForEntry(entryId))) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
long ignored = inputStream.skip(length);

// seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
// the first read, that would cause read an unexpected entry id which is out of range between firstEntry
// and lastEntry
// for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
// will read the entry id from the stream and that is not the correct entry id, so it will seek to the
// correct position then read the stream as normal. But the entry id may exceed the last entry id, that
// will cause we are hardly to know the edge of the request range.
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have seek to the firstEntry before reading, we can simplify the following check logic. If the entryId read from dataStream not equal to nextExpectedId, we can throw exception.
@codelipenghui Please help check this logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seek operation is being used to seeking to the first entry id position, and then do the original logic. This operation is only to make sure the read position is as we wanted.
Before this, it reads the first data to get to know if the read position is the right position. If not, it will seek to the right position.

The following check logic is still needed to help us to seek to the right position if we met some unexpected data. Maybe we can set a limitation to avoid it into an infinite loop.


while (entriesToRead > 0) {
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
throw new BKException.BKUnexpectedConditionException();
}
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
}
long entryId = dataStream.readLong();

if (entryId == nextExpectedId) {
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
toWrite -= buf.writeBytes(dataStream, toWrite);
}
entriesToRead--;
nextExpectedId++;
} else if (entryId > nextExpectedId && entryId < lastEntry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two if check are lead to

inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;

can we merge them into one if check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

log.warn("The read entry {} is not the expected entry {} but in the range of {} - {},"
+ " seeking to the right position", entryId, nextExpectedId, nextExpectedId, lastEntry);
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId < nextExpectedId
&& !index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId))) {
log.warn("Read an unexpected entry id {} which is smaller than the next expected entry id {}"
+ ", seeking to the right position", entries, nextExpectedId);
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
long ignore = inputStream.skip(length);
}

promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
});

promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
});
return promise;
}

Expand Down Expand Up @@ -203,6 +230,12 @@ public static ReadHandle open(ScheduledExecutorService executor,
versionCheck,
index.getDataObjectLength(),
readBufferSize);

return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
}

// for testing
State getState() {
return this.state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ public void testOffloadAndRead() throws Exception {
}
}

@Test(timeOut = 60000)
public void testReadHandlerState() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
LedgerOffloader offloader = getOffloader();

UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();

BlobStoreBackedReadHandleImpl toTest = (BlobStoreBackedReadHandleImpl) offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
Assert.assertEquals(toTest.getState(), BlobStoreBackedReadHandleImpl.State.Opened);
toTest.read(0, 1);
toTest.close();
Assert.assertEquals(toTest.getState(), BlobStoreBackedReadHandleImpl.State.Closed);
}

@Test
public void testOffloadFailInitDataBlockUpload() throws Exception {
ReadHandle readHandle = buildReadHandle();
Expand Down Expand Up @@ -461,4 +477,4 @@ public void testReadUnknownIndexVersion() throws Exception {
Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
}
}
}
}
Binary file added tiered-storage/jcloud/src/test/resources/ledger-1
Binary file not shown.
Binary file not shown.
Binary file added tiered-storage/jcloud/src/test/resources/ledger-2
Binary file not shown.
Binary file not shown.