Skip to content

Commit

Permalink
Fixed some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc D'Mello committed Nov 2, 2024
1 parent 707d214 commit 3044503
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2451,6 +2451,7 @@ public void rollback() throws IOException {
// Ensure that only one thread actually gets to do the
// closing, and make sure no commit is also in progress:
if (shouldClose(true)) {
indexWriterRAMManager.removeWriter();
rollbackInternal();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand All @@ -31,7 +30,6 @@ public class IndexWriterRAMManager {
private final LinkedIdToWriter idToWriter = new LinkedIdToWriter();
private final AtomicInteger idGenerator = new AtomicInteger();
private double ramBufferSizeMB;
private final AtomicLong totalRamTracker;

/**
* Default constructor
Expand All @@ -44,7 +42,6 @@ public class IndexWriterRAMManager {
throw new IllegalArgumentException("ramBufferSize should be > 0.0 MB when enabled");
}
this.ramBufferSizeMB = ramBufferSizeMB;
this.totalRamTracker = new AtomicLong(0);
}

/** Set the buffer size for this manager */
Expand Down Expand Up @@ -79,11 +76,13 @@ private void removeWriter(int id) {
private void flushIfNecessary(
FlushPolicy flushPolicy, PerWriterIndexWriterRAMManager perWriterRAMManager)
throws IOException {
flushPolicy.flushWriter(this, perWriterRAMManager);
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
flushPolicy.flushWriter(this, perWriterRAMManager);
}
}

private long updateAndGetCurrentBytesUsed(int id) {
return totalRamTracker.addAndGet(idToWriter.updateRAMAndGetDifference(id));
return idToWriter.getTotalRamTracker(id);
}

/**
Expand Down Expand Up @@ -116,72 +115,84 @@ private static class LinkedIdToWriter {
private final Map<Integer, IndexWriterNode> idToWriterNode = new HashMap<>();
private IndexWriterNode first;
private IndexWriterNode last;
private long totalRamTracker;

private final ReentrantLock lock = new ReentrantLock();

// for round-robin flushing
private int lastIdFlushed = -1;

void addWriter(IndexWriter writer, int id) {
IndexWriterNode node = new IndexWriterNode(writer, id);
lock.lock();
if (idToWriterNode.isEmpty()) {
first = node;
synchronized (lock) {
IndexWriterNode node = new IndexWriterNode(writer, id);
if (idToWriterNode.isEmpty()) {
first = node;
last = node;
}
node.next = first;
last.next = node;
node.prev = last;
last = node;
idToWriterNode.put(id, node);
}
node.next = first;
last.next = node;
node.prev = last;
last = node;
idToWriterNode.put(id, node);
lock.unlock();
}

void removeWriter(int id) {
lock.lock();
if (idToWriterNode.containsKey(id) == false) {
throw new IllegalArgumentException(
"Writer " + id + " has not been registered or has been removed already");
}
IndexWriterNode nodeToRemove = idToWriterNode.remove(id);
if (idToWriterNode.isEmpty()) {
first = null;
last = null;
return;
}
nodeToRemove.prev.next = nodeToRemove.next;
nodeToRemove.next.prev = nodeToRemove.prev;
if (nodeToRemove == first) {
first = nodeToRemove.next;
synchronized (lock) {
if (idToWriterNode.containsKey(id)) {
IndexWriterNode nodeToRemove = idToWriterNode.remove(id);
totalRamTracker -= nodeToRemove.ram;
if (idToWriterNode.isEmpty()) {
first = null;
last = null;
lastIdFlushed = -1;
return;
}
if (id == lastIdFlushed) {
lastIdFlushed = nodeToRemove.prev.id;
}
nodeToRemove.prev.next = nodeToRemove.next;
nodeToRemove.next.prev = nodeToRemove.prev;
if (nodeToRemove == first) {
first = nodeToRemove.next;
}
if (nodeToRemove == last) {
last = nodeToRemove.prev;
}
}
}
if (nodeToRemove == last) {
last = nodeToRemove.prev;
}
lock.unlock();
}

void flushRoundRobin() throws IOException {
lock.lock();
if (idToWriterNode.isEmpty()) {
throw new IllegalCallerException("No registered writers");
}
int idToFlush;
if (lastIdFlushed == -1) {
idToFlush = first.id;
} else {
idToFlush = idToWriterNode.get(lastIdFlushed).next.id;
synchronized (lock) {
if (idToWriterNode.isEmpty()) {
return;
}
int idToFlush;
if (lastIdFlushed == -1) {
idToFlush = first.id;
} else {
idToFlush = idToWriterNode.get(lastIdFlushed).next.id;
}
idToWriterNode.get(idToFlush).writer.flushNextBuffer();
lastIdFlushed = idToFlush;
}
idToWriterNode.get(idToFlush).writer.flushNextBuffer();
lastIdFlushed = idToFlush;
lock.unlock();
}

long updateRAMAndGetDifference(int id) {
lock.lock();
long oldRAMBytesUsed = idToWriterNode.get(id).ram;
long newRAMBytesUsed = idToWriterNode.get(id).writer.ramBytesUsed();
lock.unlock();
return newRAMBytesUsed - oldRAMBytesUsed;
long getTotalRamTracker(int id) {
synchronized (lock) {
if (idToWriterNode.isEmpty()) {
return 0;
}
if (idToWriterNode.containsKey(id) == false) {
return totalRamTracker;
}
long oldRAMBytesUsed = idToWriterNode.get(id).ram;
long newRAMBytesUsed = idToWriterNode.get(id).writer.ramBytesUsed();
idToWriterNode.get(id).ram = newRAMBytesUsed;
totalRamTracker += newRAMBytesUsed - oldRAMBytesUsed;
return totalRamTracker;
}
}

private static class IndexWriterNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void testDefaults() throws Exception {
getters.add("getUseCompoundFile");
getters.add("isCheckPendingFlushOnUpdate");
getters.add("getSoftDeletesField");
getters.add("getIndexWriterRAMManager");

for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {
Expand Down

0 comments on commit 3044503

Please sign in to comment.