diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/QueueMetadata.java b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/QueueMetadata.java index 3d599a4..2df8c56 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/QueueMetadata.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/QueueMetadata.java @@ -127,6 +127,14 @@ public void incrementActive() { active.incrementAndGet(); } + public void setActiveCount(int value) { + active.set(value); + } + + public void setCompletedCount(int value) { + completed.set(value); + } + public void incrementCompleted() { completed.incrementAndGet(); } diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java index a2a3a28..8bcb9fa 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/rocksdb/RocksDBService.java @@ -27,6 +27,7 @@ import io.grpc.stub.StreamObserver; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -40,6 +41,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.rocksdb.BlockBasedTableConfig; @@ -122,7 +124,8 @@ public RocksDBService(final Map configuration) { final List cfDescriptors = Arrays.asList( new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts), - new ColumnFamilyDescriptor("queues".getBytes(), cfOpts)); + new ColumnFamilyDescriptor("queues".getBytes(), cfOpts), + new ColumnFamilyDescriptor("queueInfos".getBytes(), cfOpts)); long start = System.currentTimeMillis(); @@ -161,9 +164,13 @@ public RocksDBService(final Map configuration) { LOG.info("RocksDB loaded in {} msec", end - start); - LOG.info("Scanning tables to rebuild queues... (can take a long time)"); - - recoveryQscan(checkOnRecovery); + // full re-check + if (checkOnRecovery) { + LOG.info("Scanning tables to rebuild queues... (can take a long time)"); + recoveryQscan(true); + } else { + recovery(); + } long end2 = System.currentTimeMillis(); @@ -171,7 +178,22 @@ public RocksDBService(final Map configuration) { } } - /** Resurrects the queues from the tables and optionally does sanity checks * */ + private void recovery() { + // if a table containing the queues info exists use it, + // otherwise just rebuild from the content of the tables + int read = 0; + try { + read = readQueueInfos(); + } catch (RocksDBException e) { + LOG.error("readQueueInfos", e); + } + // nothing found? rebuild + if (read == 0) { + recoveryQscan(false); + } + } + + /** Resurrects the queues from the URL tables and optionally does sanity checks * */ private void recoveryQscan(boolean check) { LOG.info("Recovering queues from existing RocksDB"); @@ -513,6 +535,14 @@ public void close() throws IOException { super.close(); + // persisting the counts for the queues + try { + writeQueueInfos(); + } catch (Exception e) { + LOG.error("writeQueueInfos ", e); + rocksDB.destroyColumnFamilyHandle(columnFamilyHandleList.get(2)); + } + for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) { columnFamilyHandle.close(); } @@ -531,6 +561,72 @@ public void close() throws IOException { } } + /** + * When shutting down an instance - write the stats about the queues into a table for faster + * reloading later + * + * @throws RocksDBException + */ + private void writeQueueInfos() throws RocksDBException { + long start = System.currentTimeMillis(); + int queuesWritten = 0; + ByteBuffer bb = ByteBuffer.allocate(8); + for (Entry entry : getQueues().entrySet()) { + queuesWritten++; + int active = entry.getValue().countActive(); + int completed = entry.getValue().getCountCompleted(); + bb.putInt(active); + bb.putInt(completed); + rocksDB.put( + columnFamilyHandleList.get(2), + entry.getKey().toString().getBytes(), + bb.array()); + bb.clear(); + } + long end = System.currentTimeMillis(); + LOG.info( + "writeQueueInfos stored stats for {} queues in {} msec", + queuesWritten, + (end - start)); + } + + private int readQueueInfos() throws RocksDBException { + long start = System.currentTimeMillis(); + int queuesRead = 0; + byte[] firstKey = null; + byte[] lastKey = null; + ByteBuffer bb = ByteBuffer.allocate(8); + try (final RocksIterator rocksIterator = + rocksDB.newIterator(columnFamilyHandleList.get(2))) { + for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) { + if (firstKey == null) firstKey = rocksIterator.key(); + lastKey = rocksIterator.key(); + queuesRead++; + final String currentKey = new String(rocksIterator.key(), StandardCharsets.UTF_8); + final QueueWithinCrawl qk = QueueWithinCrawl.parseAndDeNormalise(currentKey); + QueueMetadata queueMD = + (QueueMetadata) getQueues().computeIfAbsent(qk, s -> new QueueMetadata()); + rocksIterator.value(bb); + int active = bb.getInt(); + int completed = bb.getInt(); + bb.clear(); + queueMD.setActiveCount(active); + queueMD.setCompletedCount(completed); + } + } + + if (queuesRead != 0) { + // empty the table so that we don't read it again + // if there has been a nasty crash + rocksDB.deleteRange(columnFamilyHandleList.get(2), firstKey, lastKey); + } + + long end = System.currentTimeMillis(); + LOG.info("readQueueInfos read stats for {} queues in {} msec", queuesRead, (end - start)); + + return queuesRead; + } + @Override public void deleteCrawl( crawlercommons.urlfrontier.Urlfrontier.DeleteCrawlMessage crawlID,