Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RocksDB backend - faster restarts, implements #54 #60

Merged
merged 1 commit into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ public void incrementActive() {
active.incrementAndGet();
}

public void setActiveCount(int value) {
active.set(value);
}

public void setCompletedCount(int value) {
completed.set(value);
}

public void incrementCompleted() {
completed.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -40,6 +41,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.rocksdb.BlockBasedTableConfig;
Expand Down Expand Up @@ -122,7 +124,8 @@ public RocksDBService(final Map<String, String> configuration) {
final List<ColumnFamilyDescriptor> cfDescriptors =
Arrays.asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
new ColumnFamilyDescriptor("queues".getBytes(), cfOpts));
new ColumnFamilyDescriptor("queues".getBytes(), cfOpts),
new ColumnFamilyDescriptor("queueInfos".getBytes(), cfOpts));

long start = System.currentTimeMillis();

Expand Down Expand Up @@ -161,17 +164,36 @@ public RocksDBService(final Map<String, String> configuration) {

LOG.info("RocksDB loaded in {} msec", end - start);

LOG.info("Scanning tables to rebuild queues... (can take a long time)");

recoveryQscan(checkOnRecovery);
// full re-check
if (checkOnRecovery) {
LOG.info("Scanning tables to rebuild queues... (can take a long time)");
recoveryQscan(true);
} else {
recovery();
}

long end2 = System.currentTimeMillis();

LOG.info("{} queues discovered in {} msec", getQueues().size(), (end2 - end));
}
}

/** Resurrects the queues from the tables and optionally does sanity checks * */
private void recovery() {
// if a table containing the queues info exists use it,
// otherwise just rebuild from the content of the tables
int read = 0;
try {
read = readQueueInfos();
} catch (RocksDBException e) {
LOG.error("readQueueInfos", e);
}
// nothing found? rebuild
if (read == 0) {
recoveryQscan(false);
}
}

/** Resurrects the queues from the URL tables and optionally does sanity checks * */
private void recoveryQscan(boolean check) {

LOG.info("Recovering queues from existing RocksDB");
Expand Down Expand Up @@ -513,6 +535,14 @@ public void close() throws IOException {

super.close();

// persisting the counts for the queues
try {
writeQueueInfos();
} catch (Exception e) {
LOG.error("writeQueueInfos ", e);
rocksDB.destroyColumnFamilyHandle(columnFamilyHandleList.get(2));
}

for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
columnFamilyHandle.close();
}
Expand All @@ -531,6 +561,72 @@ public void close() throws IOException {
}
}

/**
* When shutting down an instance - write the stats about the queues into a table for faster
* reloading later
*
* @throws RocksDBException
*/
private void writeQueueInfos() throws RocksDBException {
long start = System.currentTimeMillis();
int queuesWritten = 0;
ByteBuffer bb = ByteBuffer.allocate(8);
for (Entry<QueueWithinCrawl, QueueInterface> entry : getQueues().entrySet()) {
queuesWritten++;
int active = entry.getValue().countActive();
int completed = entry.getValue().getCountCompleted();
bb.putInt(active);
bb.putInt(completed);
rocksDB.put(
columnFamilyHandleList.get(2),
entry.getKey().toString().getBytes(),
bb.array());
bb.clear();
}
long end = System.currentTimeMillis();
LOG.info(
"writeQueueInfos stored stats for {} queues in {} msec",
queuesWritten,
(end - start));
}

private int readQueueInfos() throws RocksDBException {
long start = System.currentTimeMillis();
int queuesRead = 0;
byte[] firstKey = null;
byte[] lastKey = null;
ByteBuffer bb = ByteBuffer.allocate(8);
try (final RocksIterator rocksIterator =
rocksDB.newIterator(columnFamilyHandleList.get(2))) {
for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {
if (firstKey == null) firstKey = rocksIterator.key();
lastKey = rocksIterator.key();
queuesRead++;
final String currentKey = new String(rocksIterator.key(), StandardCharsets.UTF_8);
final QueueWithinCrawl qk = QueueWithinCrawl.parseAndDeNormalise(currentKey);
QueueMetadata queueMD =
(QueueMetadata) getQueues().computeIfAbsent(qk, s -> new QueueMetadata());
rocksIterator.value(bb);
int active = bb.getInt();
int completed = bb.getInt();
bb.clear();
queueMD.setActiveCount(active);
queueMD.setCompletedCount(completed);
}
}

if (queuesRead != 0) {
// empty the table so that we don't read it again
// if there has been a nasty crash
rocksDB.deleteRange(columnFamilyHandleList.get(2), firstKey, lastKey);
}

long end = System.currentTimeMillis();
LOG.info("readQueueInfos read stats for {} queues in {} msec", queuesRead, (end - start));

return queuesRead;
}

@Override
public void deleteCrawl(
crawlercommons.urlfrontier.Urlfrontier.DeleteCrawlMessage crawlID,
Expand Down