Skip to content

Commit

Permalink
Fix syntax and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
adamretter committed Mar 11, 2024
1 parent eae1567 commit b34d75c
Showing 1 changed file with 84 additions and 58 deletions.
142 changes: 84 additions & 58 deletions java/src/test/java/org/rocksdb/SequenceNumberAllocationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@

package org.rocksdb;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.*;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -21,86 +18,101 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class SequenceNumberAllocationTest {
@Parameters(name = "{0}")
public static Iterable<Boolean> parameters() {
return Arrays.asList(true, false);
}

@Parameter(0) public boolean disableWAL;

@ClassRule
public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
new RocksNativeLibraryResource();

@Rule public TemporaryFolder dbFolder = new TemporaryFolder();

@Test
public void testSeqnoFlushAndClose() throws RocksDBException {
public void seqnoFlushAndCloseWithWal() throws RocksDBException {
seqnoFlushAndClose(false);
}

@Test
public void seqnoFlushAndCloseWithoutWal() throws RocksDBException {
seqnoFlushAndClose(true);
}

private void seqnoFlushAndClose(final boolean disableWal) throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWAL);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWal);
final FlushOptions flushOptions = new FlushOptions();
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
int count = fillDatabase(db, writeOptions, 0, 15);
final int count = fillDatabase(db, writeOptions, 0, 15);
assertEquals("Database values count mismatch", 15, count);
db.flush(flushOptions);
db.closeE();
}

try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
assertTrue("Database should not be empty", db.getLatestSequenceNumber() != 0);
assertNotEquals("Database should not be empty", 0, db.getLatestSequenceNumber());
assertEquals("Sequence number changed between closing and reopening the database", 15,
db.getLatestSequenceNumber());
}
}

@Test
public void testSeqnoCloseWithoutFlush() throws RocksDBException {
public void seqnoCloseWithoutFlushWithWal() throws RocksDBException {
seqnoCloseWithoutFlush(false);
}

@Test
public void seqnoCloseWithoutFlushWithoutWal() throws RocksDBException {
seqnoCloseWithoutFlush(true);
}

private void seqnoCloseWithoutFlush(final boolean disableWal) throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWAL);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWal);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
int count = fillDatabase(db, writeOptions, 0, 30);
final int count = fillDatabase(db, writeOptions, 0, 30);
assertEquals("Database values count mismatch", 30, count);
db.closeE();
}

try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
assertTrue("Database should not be empty", db.getLatestSequenceNumber() != 0);
assertNotEquals("Database should not be empty", 0, db.getLatestSequenceNumber());
assertEquals("Sequence number changed between closing and reopening the database", 30,
db.getLatestSequenceNumber());
}
}

@Test
public void testSeqnoThreadInterruptAfterFlush() throws RocksDBException {
ExecutorService executor = Executors.newFixedThreadPool(1);
AtomicLong sequenceNumberAfterFlush = new AtomicLong(0);
CountDownLatch waitingSignal = new CountDownLatch(1);
CountDownLatch dbOperationsSignal = new CountDownLatch(1);
public void seqnoThreadInterruptAfterFlushWithWal() throws RocksDBException {
seqnoThreadInterruptAfterFlush(false);
}

@Test
public void seqnoThreadInterruptAfterFlushWithoutWal() throws RocksDBException {
seqnoThreadInterruptAfterFlush(true);
}

private void seqnoThreadInterruptAfterFlush(final boolean disableWal) throws RocksDBException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
final AtomicLong sequenceNumberAfterFlush = new AtomicLong(0);
final CountDownLatch waitingSignal = new CountDownLatch(1);
final CountDownLatch dbOperationsSignal = new CountDownLatch(1);

Future<?> task = executor.submit(() -> {
final Future<?> task = executor.submit(() -> {
try (final Options options = new Options().setCreateIfMissing(true);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWAL);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWal);
final FlushOptions flushOptions = new FlushOptions();
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
int count = fillDatabase(db, writeOptions, 0, 10);
final int count = fillDatabase(db, writeOptions, 0, 10);
assertEquals("Database values count mismatch", 10, count);
db.flush(flushOptions);
sequenceNumberAfterFlush.set(db.getLatestSequenceNumber());
assertEquals("Incorrect sequence number after flush", 10, sequenceNumberAfterFlush.get());
dbOperationsSignal.countDown();
waitingSignal.await(60, TimeUnit.SECONDS);
} catch (RocksDBException e) {
fail("Encountered exception:" + e);
} catch (InterruptedException e) {
} catch (final RocksDBException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.interrupted();
}
fail("Encountered exception:" + e.getMessage());
} finally {
dbOperationsSignal.countDown();
waitingSignal.countDown();
Expand All @@ -111,30 +123,41 @@ public void testSeqnoThreadInterruptAfterFlush() throws RocksDBException {
dbOperationsSignal.await(5, TimeUnit.SECONDS);
task.cancel(true);
assertTrue("Failed to cancel task", task.isCancelled());
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.interrupted();
fail("Encountered exception:" + e.getMessage());
} finally {
executor.shutdownNow();
assertTrue("Executor did not shutdown", executor.isShutdown());
}

try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.openReadOnly(options, dbFolder.getRoot().getAbsolutePath())) {
assertTrue("Database should not be empty", db.getLatestSequenceNumber() != 0);
assertNotEquals("Database should not be empty", 0, db.getLatestSequenceNumber());
assertEquals("Lastest sequence number is not the one expected",
sequenceNumberAfterFlush.get(), db.getLatestSequenceNumber());
}
}

@Test
public void testSeqnoDataWrittenAfterFlush() throws RocksDBException {
ExecutorService executor = Executors.newFixedThreadPool(1);
AtomicLong sequenceNumberAfterFlush = new AtomicLong(0);
CountDownLatch waitingSignal = new CountDownLatch(1);
CountDownLatch dbOperationsSignal = new CountDownLatch(1);
public void seqnoDataWrittenAfterFlushWithWal() throws RocksDBException {
seqnoDataWrittenAfterFlush(false);
}

@Test
public void seqnoDataWrittenAfterFlushWithoutWal() throws RocksDBException {
seqnoDataWrittenAfterFlush(true);
}

private void seqnoDataWrittenAfterFlush(final boolean disableWal) throws RocksDBException {
final ExecutorService executor = Executors.newFixedThreadPool(1);
final AtomicLong sequenceNumberAfterFlush = new AtomicLong(0);
final CountDownLatch waitingSignal = new CountDownLatch(1);
final CountDownLatch dbOperationsSignal = new CountDownLatch(1);

Future<?> task = executor.submit(() -> {
final Future<?> task = executor.submit(() -> {
try (final Options options = new Options().setCreateIfMissing(true);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWAL);
final WriteOptions writeOptions = new WriteOptions().setDisableWAL(disableWal);
final FlushOptions flushOptions = new FlushOptions();
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
int count = fillDatabase(db, writeOptions, 0, 10);
Expand All @@ -148,9 +171,11 @@ public void testSeqnoDataWrittenAfterFlush() throws RocksDBException {
"Unexpected value for latest DB sequence number", 20, db.getLatestSequenceNumber());
dbOperationsSignal.countDown();
waitingSignal.await(60, TimeUnit.SECONDS);
} catch (RocksDBException e) {
fail("Encountered exception:" + e);
} catch (InterruptedException e) {
} catch (final RocksDBException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.interrupted();
}
fail("Encountered exception:" + e.getMessage());
} finally {
dbOperationsSignal.countDown();
waitingSignal.countDown();
Expand All @@ -163,33 +188,34 @@ public void testSeqnoDataWrittenAfterFlush() throws RocksDBException {
dbOperationsSignal.await(5, TimeUnit.SECONDS);
task.cancel(true);
assertTrue("Failed to cancel task", task.isCancelled());
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.interrupted();
fail("Encountered exception:" + e.getMessage());
} finally {
executor.shutdownNow();
assertTrue("Executor did not shutdown", executor.isShutdown());
}

int expectedSequenceNumber = disableWAL ? 10 : 20;
final int expectedSequenceNumber = disableWal ? 10 : 20;
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.openReadOnly(options, dbFolder.getRoot().getAbsolutePath())) {
assertTrue("Database should not be empty", db.getLatestSequenceNumber() != 0);
assertNotEquals("Database should not be empty", 0, db.getLatestSequenceNumber());
assertEquals("Lastest sequence number is not the one expected", expectedSequenceNumber,
db.getLatestSequenceNumber());
}
}

private int fillDatabase(RocksDB db, WriteOptions writeOptions, int start, int end)
throws RocksDBException {
private int fillDatabase(final RocksDB db, final WriteOptions writeOptions, final int start,
final int end) throws RocksDBException {
assertEquals("Database does not have expected sequence number " + start, start,
db.getLatestSequenceNumber());
int count;
for (count = start; count < end; ++count) {
byte[] key = ("key" + count).getBytes(StandardCharsets.UTF_8);
byte[] value = ("value" + count).getBytes(StandardCharsets.UTF_8);
final byte[] key = ("key" + count).getBytes(UTF_8);
final byte[] value = ("value" + count).getBytes(UTF_8);
db.put(writeOptions, key, value);
assertEquals("Sequence number is not updated", count + 1, db.getLatestSequenceNumber());
}

return count;
}
}

0 comments on commit b34d75c

Please sign in to comment.