Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-tenant RAM buffers for IndexWriter #13951

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexUpgrader;
Expand Down Expand Up @@ -131,7 +132,7 @@ public void testUpgradeOldSingleSegmentIndexWithAdditions() throws Exception {
// add dummy segments (which are all in current
// version) to single segment index
MergePolicy mp = random().nextBoolean() ? newLogMergePolicy() : newTieredMergePolicy();
IndexWriterConfig iwc = new IndexWriterConfig(null).setMergePolicy(mp);
IndexWriterConfig iwc = new IndexWriterConfig((Analyzer) null).setMergePolicy(mp);
IndexWriter w = new IndexWriter(directory, iwc);
w.addIndexes(ramDir);
try (w) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.nio.file.Path;
import java.util.Properties;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.benchmark.BenchmarkTestCase;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.utils.Config;
Expand Down Expand Up @@ -45,7 +46,7 @@ public static void beforeClassAddIndexesTaskTest() throws Exception {
inputDir = testDir.resolve("input");
Directory tmpDir = newFSDirectory(inputDir);
try {
IndexWriter writer = new IndexWriter(tmpDir, new IndexWriterConfig(null));
IndexWriter writer = new IndexWriter(tmpDir, new IndexWriterConfig((Analyzer) null));
for (int i = 0; i < 10; i++) {
writer.addDocument(new Document());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.lucene.index;

import java.io.IOException;

/**
* Default {@link FlushPolicy} implementation that flushes new segments based on RAM used and
* document count depending on the IndexWriter's {@link IndexWriterConfig}. It also applies pending
Expand Down Expand Up @@ -52,6 +54,18 @@ && flushOnDocCount()
}
}

@Override
public void flushRamManager(IndexWriter writer) throws IOException {
IndexWriterRAMManager ramManager = writer.getConfig().indexWriterRAMManager;
if (ramManager.getRamBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH
&& ramManager.getWriterCount() > 1) {
long totalBytes = ramManager.updateAndGetCurrentBytesUsed(writer.ramManagerId);
if (totalBytes > ramManager.getRamBufferSizeMB() * 1024 * 1024) {
ramManager.flushRoundRobin();
}
}
}

private void flushDeletes(DocumentsWriterFlushControl control) {
control.setApplyAllDeletes();
if (infoStream.isEnabled("FP")) {
Expand Down
8 changes: 8 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;

import java.io.IOException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;

Expand Down Expand Up @@ -57,6 +58,13 @@ abstract class FlushPolicy {
public abstract void onChange(
DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread);

/**
* Flushed a writer according to the FlushPolicy. NOTE: this doesn't necessarily mean the passed
* in writer will be flushed, and in most cases, this will actually be the case as the default
* policy is a round-robin policy
*/
public abstract void flushRamManager(IndexWriter writer) throws IOException;

/** Called by DocumentsWriter to initialize the FlushPolicy */
protected synchronized void init(LiveIndexWriterConfig indexWriterConfig) {
this.indexWriterConfig = indexWriterConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.CommandLineUtil;
Expand Down Expand Up @@ -135,7 +136,7 @@ static IndexUpgrader parseArgs(String[] args) throws IOException {
* {@code matchVersion}. The tool refuses to upgrade indexes with multiple commit points.
*/
public IndexUpgrader(Directory dir) {
this(dir, new IndexWriterConfig(null), false);
this(dir, new IndexWriterConfig((Analyzer) null), false);
}

/**
Expand All @@ -145,7 +146,7 @@ public IndexUpgrader(Directory dir) {
* be sent to this stream.
*/
public IndexUpgrader(Directory dir, InfoStream infoStream, boolean deletePriorCommits) {
this(dir, new IndexWriterConfig(null), deletePriorCommits);
this(dir, new IndexWriterConfig((Analyzer) null), deletePriorCommits);
if (null != infoStream) {
this.iwc.setInfoStream(infoStream);
}
Expand Down
7 changes: 7 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ public void onTicketBacklog() {
}
};

/** The id that is associated with this writer for {@link IndexWriterRAMManager} */
public final int ramManagerId;

/**
* Expert: returns a readonly reader, covering all committed as well as un-committed changes to
* the index. This provides "near real-time" searching, in that changes made during an IndexWriter
Expand Down Expand Up @@ -1211,6 +1214,7 @@ public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
writeLock = null;
}
}
this.ramManagerId = config.indexWriterRAMManager.registerWriter(this);
}

/** Confirms that the incoming index sort (if any) matches the existing index sort (if any). */
Expand Down Expand Up @@ -1365,6 +1369,7 @@ private void shutdown() throws IOException {
*/
@Override
public void close() throws IOException {
config.indexWriterRAMManager.removeWriter(ramManagerId);
if (config.getCommitOnClose()) {
shutdown();
} else {
Expand Down Expand Up @@ -2445,6 +2450,7 @@ public void rollback() throws IOException {
// Ensure that only one thread actually gets to do the
// closing, and make sure no commit is also in progress:
if (shouldClose(true)) {
config.indexWriterRAMManager.removeWriter(ramManagerId);
rollbackInternal();
}
}
Expand Down Expand Up @@ -6012,6 +6018,7 @@ private long maybeProcessEvents(long seqNo) throws IOException {
seqNo = -seqNo;
processEvents(true);
}
config.flushPolicy.flushRamManager(this);
return seqNo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,21 @@ public IndexWriterConfig() {
* problem you should switch to {@link LogByteSizeMergePolicy} or {@link LogDocMergePolicy}.
*/
public IndexWriterConfig(Analyzer analyzer) {
super(analyzer);
this(analyzer, new IndexWriterRAMManager(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not making this change in the default constructor? We could avoid making changes to all the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason I had to make all the changes in the test was cause I added. this constructor:

public IndexWriterConfig(IndexWriterRAMManager indexWriterRAMManager) {

And then in the tests there were a bunch of new IndexWriterConfig(null) calls which became ambiguous. I think that this constructor is potentially useful which is why I took the hit and changed all those tests, but I can remove it to avoid all those test changes?

}

/**
* Creates a new config with the provided {@link IndexWriterRAMManager}. If you want to share a
* buffer between multiple {@link IndexWriter}, you will need to use this constructor as {@link
* IndexWriterConfig} maintains a 1:1 relationship with {@link IndexWriter}
*/
public IndexWriterConfig(IndexWriterRAMManager indexWriterRAMManager) {
this(new StandardAnalyzer(), indexWriterRAMManager);
}

/** Creates a new config with the provided {@link Analyzer} and {@link IndexWriterRAMManager} */
public IndexWriterConfig(Analyzer analyzer, IndexWriterRAMManager indexWriterRAMManager) {
super(analyzer, indexWriterRAMManager);
}

/**
Expand Down Expand Up @@ -393,6 +407,11 @@ public double getRAMBufferSizeMB() {
return super.getRAMBufferSizeMB();
}

@Override
public IndexWriterRAMManager getIndexWriterRAMManager() {
return super.getIndexWriterRAMManager();
}

/**
* Information about merges, deletes and a message when maxFieldLength is reached will be printed
* to this. Must not be null, but {@link InfoStream#NO_OUTPUT} may be used to suppress output.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
* For managing multiple instances of {@link IndexWriter} sharing the same buffer (configured by
* {@link IndexWriterConfig#setRAMBufferSizeMB})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be the other way around in my opinion, the RAM buffer size should be on IndexWriterRAMManager, and setting a ramBufferSizeMB on IndexWriterConfig would internally create a new IndexWriterRAMManager under the hood that is shared with no other IndexWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm probably missing something here - so I get what you're saying about having the IndexWriterRAMManager in the IndexWriterConfig, but what would be the point of creating an IndexWriterRAMManager for a single IndexWriter? Wouldn't DocumentWriterFlushControl be sufficient for this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but what would be the point of creating an IndexWriterRAMManager for a single IndexWriter

I think the idea is to be able to create IndexWriters that don't share their RAM buffer limit with other writers. Maybe we could just set IndexWriterRAMManager to null, if ramBufferSizeMB is explicitly specified in the IW config (instead of creating a new ram manager).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I took a slightly different approach, will publish a new PR soon. Maybe we can discuss it there but pretty much I just do what @jpountz suggested and move the ramBufferSizeMB to be a value held by the IndexWriterRAMManager. I think we can then discuss if we should just disable calling writer flushes in the manager if there is only a single writer.

*/
public class IndexWriterRAMManager {
private final LinkedIdToWriter idToWriter = new LinkedIdToWriter();
private final AtomicInteger idGenerator = new AtomicInteger();
private double ramBufferSizeMB;

/**
* Default constructor
*
* @param ramBufferSizeMB the RAM buffer size to use between all registered {@link IndexWriter}
* instances
*/
public IndexWriterRAMManager(double ramBufferSizeMB) {
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && ramBufferSizeMB <= 0.0) {
throw new IllegalArgumentException("ramBufferSize should be > 0.0 MB when enabled");
}
this.ramBufferSizeMB = ramBufferSizeMB;
}

/** Set the buffer size for this manager */
public void setRamBufferSizeMB(double ramBufferSizeMB) {
this.ramBufferSizeMB = ramBufferSizeMB;
}

/** Get the buffer size assigned to this manager */
public double getRamBufferSizeMB() {
return ramBufferSizeMB;
}

/**
* Calls {@link IndexWriter#flushNextBuffer()} in a round-robin fashion starting from the first
* writer added that has not been removed yet. Subsequent calls will flush the next writer in line
* and eventually loop back to the beginning. Returns the flushed writer id for testing
*/
public int flushRoundRobin() throws IOException {
return idToWriter.flushRoundRobin();
}

/** Gets the number of writers registered with this ram manager */
public int getWriterCount() {
return idToWriter.size();
}

/** Registers a writer can returns the associated ID */
protected int registerWriter(IndexWriter writer) {
int id = idGenerator.incrementAndGet();
idToWriter.addWriter(writer, id);
return id;
}

/** Removes a writer given the writer's ide, protected for testing */
protected void removeWriter(int id) {
idToWriter.removeWriter(id);
}

/**
* Will call {@link IndexWriter#ramBytesUsed()} for the writer id passed in, and then updates the
* total ram using that value and returns it
*/
public long updateAndGetCurrentBytesUsed(int id) {
return idToWriter.getTotalRamTracker(id);
}

private static class LinkedIdToWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a java Queue implementation instead of the custom linked-list logic? You could round-robin on elements by removing, processing and add them back to the queue.

I suppose this queue size would be small, so array deque and linked lists are both fine? We can also get some thread safe implementations out of the box.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so I looked into using both Queue and LinkedHashMap to do this. The issue I found was that there was still complexity in maintaining the last id that was flushed and the total ram used, so the only function that really became less complex was the addWriter function. I think given that this probably won't simplify the overall function too much, I'm inclined the keep the implementation the same way it is right now, but if you disagree feel free to let me know and I can take a second look at it.

private final Map<Integer, IndexWriterNode> idToWriterNode = new HashMap<>();
private IndexWriterNode first;
private IndexWriterNode last;
private long totalRamTracker;

private final ReentrantLock lock = new ReentrantLock();

// for round-robin flushing
private int lastIdFlushed = -1;

void addWriter(IndexWriter writer, int id) {
synchronized (lock) {
IndexWriterNode node = new IndexWriterNode(writer, id);
if (idToWriterNode.isEmpty()) {
first = node;
last = node;
}
node.next = first;
last.next = node;
node.prev = last;
last = node;
first.prev = node;
idToWriterNode.put(id, node);
}
}

void removeWriter(int id) {
synchronized (lock) {
if (idToWriterNode.containsKey(id)) {
IndexWriterNode nodeToRemove = idToWriterNode.remove(id);
totalRamTracker -= nodeToRemove.ram;
if (idToWriterNode.isEmpty()) {
first = null;
last = null;
lastIdFlushed = -1;
return;
}
if (id == lastIdFlushed) {
lastIdFlushed = nodeToRemove.prev.id;
}
nodeToRemove.prev.next = nodeToRemove.next;
nodeToRemove.next.prev = nodeToRemove.prev;
if (nodeToRemove == first) {
first = nodeToRemove.next;
}
if (nodeToRemove == last) {
last = nodeToRemove.prev;
}
}
}
}

// Returns the writer id that we attempted to flush (for testing purposes)
int flushRoundRobin() throws IOException {
synchronized (lock) {
if (idToWriterNode.isEmpty()) {
return -1;
}
int idToFlush;
if (lastIdFlushed == -1) {
idToFlush = first.id;
} else {
idToFlush = idToWriterNode.get(lastIdFlushed).next.id;
}
idToWriterNode.get(idToFlush).writer.flushNextBuffer();
lastIdFlushed = idToFlush;
return idToFlush;
}
}

long getTotalRamTracker(int id) {
synchronized (lock) {
if (idToWriterNode.isEmpty()) {
return 0;
}
if (idToWriterNode.containsKey(id) == false) {
return totalRamTracker;
}
long oldRAMBytesUsed = idToWriterNode.get(id).ram;
long newRAMBytesUsed = idToWriterNode.get(id).writer.ramBytesUsed();
idToWriterNode.get(id).ram = newRAMBytesUsed;
totalRamTracker += newRAMBytesUsed - oldRAMBytesUsed;
return totalRamTracker;
}
}

int size() {
synchronized (lock) {
return idToWriterNode.size();
}
}

private static class IndexWriterNode {
IndexWriter writer;
int id;
long ram;
IndexWriterNode next;
IndexWriterNode prev;

IndexWriterNode(IndexWriter writer, int id) {
this.writer = writer;
this.id = id;
this.ram = writer.ramBytesUsed();
}
}
}
}
Loading
Loading