Skip to content

Commit

Permalink
Add testing
Browse files Browse the repository at this point in the history
  • Loading branch information
zymap committed Sep 22, 2021
1 parent 45716c7 commit c01e7b9
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
Expand Down Expand Up @@ -61,8 +61,6 @@ enum State {
Closed
}

private AtomicReferenceFieldUpdater<BlobStoreBackedReadHandleImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(BlobStoreBackedReadHandleImpl.class, State.class, "state");
private volatile State state = null;

private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
Expand All @@ -73,7 +71,7 @@ private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
this.inputStream = inputStream;
this.dataStream = new DataInputStream(inputStream);
this.executor = executor;
STATE_UPDATER.set(this, State.Opened);
state = State.Opened;
}

@Override
Expand All @@ -93,7 +91,7 @@ public CompletableFuture<Void> closeAsync() {
try {
index.close();
inputStream.close();
STATE_UPDATER.set(this, State.Closed);
state = State.Closed;
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
Expand All @@ -107,61 +105,64 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.submit(() -> {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
try {
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
promise.completeExceptionally(new BKException.BKIncorrectParameterException());
return;
}
long entriesToRead = (lastEntry - firstEntry) + 1;
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
try {
while (entriesToRead > 0) {
State state = STATE_UPDATER.get(this);
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
throw new BKException.BKUnexpectedConditionException();
}
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
}
long entryId = dataStream.readLong();

if (entryId == nextExpectedId) {
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
toWrite -= buf.writeBytes(dataStream, toWrite);
}
entriesToRead--;
nextExpectedId++;
} else if (entryId > nextExpectedId && entryId < lastEntry) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId < nextExpectedId
&& !index.getIndexEntryForEntry(nextExpectedId).equals(
index.getIndexEntryForEntry(entryId))) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
long ignored = inputStream.skip(length);
boolean sleep = true;
while (entriesToRead > 0) {
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
throw new BKException.BKUnexpectedConditionException();
}
if (sleep) {
TimeUnit.SECONDS.sleep(10);
}
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
}
long entryId = dataStream.readLong();

if (entryId == nextExpectedId) {
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
toWrite -= buf.writeBytes(dataStream, toWrite);
}
entriesToRead--;
nextExpectedId++;
} else if (entryId > nextExpectedId && entryId < lastEntry) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId < nextExpectedId
&& !index.getIndexEntryForEntry(nextExpectedId).equals(
index.getIndexEntryForEntry(entryId))) {
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
long ignored = inputStream.skip(length);
}

promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
});

promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
});
return promise;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import org.apache.bookkeeper.client.api.ReadHandle;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.io.Payload;
import org.testng.annotations.Test;

import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.*;

public class BlobStoreBackedReadHandleImplTest {

private final static String bucket = "test-offload-read";
private final static int firstLedgerID = 1;
private final static String firstLedgerKey = "ledger-1";
private final static String firstLedgerIndexKey = "ledger-1-index";
private final static int secondLedgerID = 2;
private final static String secondLedgerKey = "ledger-2";
private final static String secondLedgerIndexKey = "ledger-2-index";

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
DataBlockUtils.VersionCheck versionCheck = (k, b) -> { return; };
private final static int readBufferSize = 1024;

@Test
public void testRead() throws Exception {
InputStream firstIndexInputStream = getClass().getClassLoader().getResourceAsStream(firstLedgerIndexKey);
InputStream firstDataInputStream = getClass().getClassLoader().getResourceAsStream(firstLedgerKey);
InputStream secondIndexInputStream = getClass().getClassLoader().getResourceAsStream(secondLedgerIndexKey);
InputStream secondDataInputStream = getClass().getClassLoader().getResourceAsStream(secondLedgerKey);

BlobStore blobStore = mock(BlobStore.class);

Blob firstIndexBlob = mock(Blob.class);
Payload firstIndexPayload = mock(Payload.class);
when(blobStore.getBlob(eq(bucket), eq(firstLedgerIndexKey))).thenReturn(firstIndexBlob);
when(firstIndexBlob.getPayload()).thenReturn(firstIndexPayload);
when(firstIndexPayload.openStream()).thenReturn(firstIndexInputStream);

Blob firstDataBlob = mock(Blob.class);
Payload firstDataPayload = mock(Payload.class);
when(blobStore.getBlob(eq(bucket), eq(firstLedgerKey), any())).thenReturn(firstDataBlob);
when(firstDataBlob.getPayload()).thenReturn(firstDataPayload);
when(firstDataPayload.openStream()).thenReturn(firstDataInputStream);

Blob secondIndexBlob = mock(Blob.class);
Payload secondIndexPayload = mock(Payload.class);
when(blobStore.getBlob(eq(bucket), eq(secondLedgerIndexKey))).thenReturn(secondIndexBlob);
when(secondIndexBlob.getPayload()).thenReturn(secondIndexPayload);
when(secondIndexPayload.openStream()).thenReturn(secondIndexInputStream);

Blob secondDataBlob = mock(Blob.class);
Payload secondDataPayload = mock(Payload.class);
when(blobStore.getBlob(eq(bucket), eq(secondLedgerKey), any())).thenReturn(secondDataBlob);
when(secondDataBlob.getPayload()).thenReturn(secondDataPayload);
when(secondDataPayload.openStream()).thenReturn(secondDataInputStream);

CountDownLatch latch = new CountDownLatch(3);
ReadHandle firstRead = BlobStoreBackedReadHandleImpl.open(executorService, blobStore,
bucket, firstLedgerKey, firstLedgerIndexKey, versionCheck, firstLedgerID, readBufferSize);
firstRead.readAsync(0, 0).whenComplete((ledgerEntries, throwable) -> {
if (throwable == null) {
latch.countDown();
}
});
firstRead.closeAsync();
firstRead.readAsync(0, 0).whenComplete((ledgerEntries, throwable) -> {
if (throwable != null) {
latch.countDown();
}
});

ReadHandle secondRead = BlobStoreBackedReadHandleImpl.open(executorService, blobStore,
bucket, secondLedgerKey, secondLedgerIndexKey, versionCheck, secondLedgerID, readBufferSize);
firstRead.readAsync(0, 0).whenComplete((ledgerEntries, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
latch.countDown();
}
});
secondRead.close();
latch.await();
}
}
Binary file added tiered-storage/jcloud/src/test/resources/ledger-1
Binary file not shown.
Binary file not shown.
Binary file added tiered-storage/jcloud/src/test/resources/ledger-2
Binary file not shown.
Binary file not shown.

0 comments on commit c01e7b9

Please sign in to comment.