Skip to content

Commit

Permalink
Removed PerWriterIndexRamManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc D'Mello committed Nov 27, 2024
1 parent 8402115 commit 92aa344
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ && flushOnDocCount()
}

@Override
public void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException {
long totalBytes = perWriterRamManager.getTotalBufferBytesUsed();
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
ramManager.flushRoundRobin();
public void flushRamManager(IndexWriter writer) throws IOException {
IndexWriterRAMManager ramManager = writer.getConfig().indexWriterRAMManager;
if (ramManager.getRamBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
long totalBytes = ramManager.updateAndGetCurrentBytesUsed(writer.ramManagerId);
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
ramManager.flushRoundRobin();
}
}
}

Expand Down
12 changes: 4 additions & 8 deletions lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,11 @@ public abstract void onChange(
DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread);

/**
* Chooses which writer should be flushed. Default implementation chooses the writer with most RAM
* usage
*
* @param ramManager the {@link IndexWriterRAMManager} being used to actually flush the writers
* Flushed a writer according to the FlushPolicy. NOTE: this doesn't necessarily mean the passed
* in writer will be flushed, and in most cases, this will actually be the case as the default
* policy is a round-robin policy
*/
public abstract void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException;
public abstract void flushRamManager(IndexWriter writer) throws IOException;

/** Called by DocumentsWriter to initialize the FlushPolicy */
protected synchronized void init(LiveIndexWriterConfig indexWriterConfig) {
Expand Down
13 changes: 6 additions & 7 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ public void onTicketBacklog() {
}
};

private final IndexWriterRAMManager.PerWriterIndexWriterRAMManager indexWriterRAMManager;
/** The id that is associated with this writer for {@link IndexWriterRAMManager} */
public final int ramManagerId;

/**
* Expert: returns a readonly reader, covering all committed as well as un-committed changes to
Expand Down Expand Up @@ -1213,9 +1214,7 @@ public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
writeLock = null;
}
}
this.indexWriterRAMManager =
new IndexWriterRAMManager.PerWriterIndexWriterRAMManager(
this, config.getIndexWriterRAMManager());
this.ramManagerId = config.indexWriterRAMManager.registerWriter(this);
}

/** Confirms that the incoming index sort (if any) matches the existing index sort (if any). */
Expand Down Expand Up @@ -1370,7 +1369,7 @@ private void shutdown() throws IOException {
*/
@Override
public void close() throws IOException {
indexWriterRAMManager.removeWriter();
config.indexWriterRAMManager.removeWriter(ramManagerId);
if (config.getCommitOnClose()) {
shutdown();
} else {
Expand Down Expand Up @@ -2451,7 +2450,7 @@ public void rollback() throws IOException {
// Ensure that only one thread actually gets to do the
// closing, and make sure no commit is also in progress:
if (shouldClose(true)) {
indexWriterRAMManager.removeWriter();
config.indexWriterRAMManager.removeWriter(ramManagerId);
rollbackInternal();
}
}
Expand Down Expand Up @@ -6019,7 +6018,7 @@ private long maybeProcessEvents(long seqNo) throws IOException {
seqNo = -seqNo;
processEvents(true);
}
indexWriterRAMManager.flushIfNecessary(config.flushPolicy);
config.flushPolicy.flushRamManager(this);
return seqNo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public int flushRoundRobin() throws IOException {
return idToWriter.flushRoundRobin();
}

/** Registers a writer and returns the associated ID, protected for testing */
/** Registers a writer can returns the associated ID */
protected int registerWriter(IndexWriter writer) {
int id = idGenerator.incrementAndGet();
idToWriter.addWriter(writer, id);
Expand All @@ -75,42 +75,12 @@ protected void removeWriter(int id) {
idToWriter.removeWriter(id);
}

private void flushIfNecessary(
FlushPolicy flushPolicy, PerWriterIndexWriterRAMManager perWriterRAMManager)
throws IOException {
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
flushPolicy.flushWriter(this, perWriterRAMManager);
}
}

private long updateAndGetCurrentBytesUsed(int id) {
return idToWriter.getTotalRamTracker(id);
}

/**
* For use in {@link IndexWriter}, manages communication with the {@link IndexWriterRAMManager}
* Will call {@link IndexWriter#ramBytesUsed()} for the writer id passed in, and then updates the
* total ram using that value and returns it
*/
public static class PerWriterIndexWriterRAMManager {

private final int id;
private final IndexWriterRAMManager manager;

PerWriterIndexWriterRAMManager(IndexWriter writer, IndexWriterRAMManager manager) {
id = manager.registerWriter(writer);
this.manager = manager;
}

void removeWriter() {
manager.removeWriter(id);
}

void flushIfNecessary(FlushPolicy flushPolicy) throws IOException {
manager.flushIfNecessary(flushPolicy, this);
}

long getTotalBufferBytesUsed() {
return manager.updateAndGetCurrentBytesUsed(id);
}
public long updateAndGetCurrentBytesUsed(int id) {
return idToWriter.getTotalRamTracker(id);
}

private static class LinkedIdToWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3218,10 +3218,7 @@ public void onChange(
}

@Override
public void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException {}
public void flushRamManager(IndexWriter writer) throws IOException {}
});
try (IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,9 @@ private static class TestFlushPolicy extends FlushPolicy {
public void onChange(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {}

@Override
public void flushWriter(
IndexWriterRAMManager ramManager,
IndexWriterRAMManager.PerWriterIndexWriterRAMManager perWriterRamManager)
throws IOException {
long totalBytes = perWriterRamManager.getTotalBufferBytesUsed();
public void flushRamManager(IndexWriter writer) throws IOException {
IndexWriterRAMManager ramManager = writer.getConfig().indexWriterRAMManager;
long totalBytes = ramManager.updateAndGetCurrentBytesUsed(writer.ramManagerId);
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
int flushedId = ramManager.flushRoundRobin();
flushedWriters.add(flushedId);
Expand Down Expand Up @@ -420,7 +418,7 @@ public int flushRoundRobin() throws IOException {
}

@Override
protected int registerWriter(IndexWriter writer) {
public int registerWriter(IndexWriter writer) {
int id = super.registerWriter(writer);
events.add(new TestEventAndId(TestEvent.ADD, id));
return id;
Expand Down

0 comments on commit 92aa344

Please sign in to comment.