Skip to content

Commit

Permalink
Multi-tenant index writer initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc D'Mello committed Oct 23, 2024
1 parent 1faf33a commit 2c6ed50
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 1 deletion.
35 changes: 34 additions & 1 deletion lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ public void onTicketBacklog() {
}
};

private final IndexWriterRAMManager.PerWriterIndexWriterRAMManager indexWriterRAMManager;

/**
* Expert: returns a readonly reader, covering all committed as well as un-committed changes to
* the index. This provides "near real-time" searching, in that changes made during an IndexWriter
Expand Down Expand Up @@ -939,11 +941,14 @@ protected final void ensureOpen() throws AlreadyClosedException {
* @param d the index directory. The index is either created or appended according <code>
* conf.getOpenMode()</code>.
* @param conf the configuration settings according to which IndexWriter should be initialized.
* @param indexWriterRAMManager The RAM manager used for multi-tenant RAM management
* @throws IOException if the directory cannot be read/written to, or if it does not exist and
* <code>conf.getOpenMode()</code> is <code>OpenMode.APPEND</code> or if there is any other
* low-level IO error
*/
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
public IndexWriter(
Directory d, IndexWriterConfig conf, IndexWriterRAMManager indexWriterRAMManager)
throws IOException {
enableTestPoints = isEnableTestPoints();
conf.setIndexWriter(this); // prevent reuse by other instances
config = conf;
Expand Down Expand Up @@ -1211,6 +1216,27 @@ public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
writeLock = null;
}
}

if (indexWriterRAMManager != null) {
this.indexWriterRAMManager =
new IndexWriterRAMManager.PerWriterIndexWriterRAMManager(this, indexWriterRAMManager);
} else {
this.indexWriterRAMManager = null;
}
}

/**
* Constructor for IndexWriter's that don't require multi-tenant RAM management
*
* @param d the index directory. The index is either created or appended according <code>
* conf.getOpenMode()</code>.
* @param conf the configuration settings according to which IndexWriter should be initialized.
* @throws IOException if the directory cannot be read/written to, or if it does not exist and
* <code>conf.getOpenMode()</code> is <code>OpenMode.APPEND</code> or if there is any other
* low-level IO error
*/
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
this(d, conf, null);
}

/** Confirms that the incoming index sort (if any) matches the existing index sort (if any). */
Expand Down Expand Up @@ -1340,6 +1366,10 @@ private void shutdown() throws IOException {
}
rollbackInternal(); // if we got that far lets rollback and close
}

if (indexWriterRAMManager != null) {
indexWriterRAMManager.removeWriter();
}
}

/**
Expand Down Expand Up @@ -6012,6 +6042,9 @@ private long maybeProcessEvents(long seqNo) throws IOException {
seqNo = -seqNo;
processEvents(true);
}
if (indexWriterRAMManager != null) {
indexWriterRAMManager.flushIfNecessary();
}
return seqNo;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* For managing multiple instances of {@link IndexWriter} sharing the same buffer (configured by
* {@link IndexWriterConfig#setRAMBufferSizeMB})
*/
public class IndexWriterRAMManager {

private final IndexWriterConfig config;
private final Map<Integer, IndexWriter> idToWriter = new ConcurrentHashMap<>();
private final AtomicInteger idGenerator = new AtomicInteger();

/**
* Default constructor
*
* @param config the index writer config containing the max RAM buffer size
*/
public IndexWriterRAMManager(IndexWriterConfig config) {
this.config = config;
}

private int registerWriter(IndexWriter writer) {
int id = idGenerator.incrementAndGet();
idToWriter.put(id, writer);
return id;
}

private void removeWriter(int id) {
if (idToWriter.containsKey(id) == false) {
throw new IllegalArgumentException(
"Writer " + id + " has not been registered or has been removed already");
}
idToWriter.remove(id);
}

private void flushIfNecessary(int id) throws IOException {
if (idToWriter.containsKey(id) == false) {
throw new IllegalArgumentException(
"Writer " + id + " has not been registered or has been removed already");
}
long totalRam = 0L;
for (IndexWriter writer : idToWriter.values()) {
totalRam += writer.ramBytesUsed();
}
if (totalRam >= config.getRAMBufferSizeMB() * 1024 * 1024) {
IndexWriter writerToFlush = chooseWriterToFlush(idToWriter.values(), idToWriter.get(id));
writerToFlush.flushNextBuffer();
}
}

/**
* Chooses which writer should be flushed. Default implementation chooses the writer with most RAM
* usage
*
* @param writers list of writers registered with this {@link IndexWriterRAMManager}
* @param callingWriter the writer calling {@link IndexWriterRAMManager#flushIfNecessary}
* @return the IndexWriter to flush
*/
protected static IndexWriter chooseWriterToFlush(
Collection<IndexWriter> writers, IndexWriter callingWriter) {
IndexWriter highestBufferWriter = null;
long highestRam = 0L;
for (IndexWriter writer : writers) {
long writerRamBytes = writer.ramBytesUsed();
if (writerRamBytes > highestRam) {
highestRam = writerRamBytes;
highestBufferWriter = writer;
}
}
return highestBufferWriter;
}

/**
* For use in {@link IndexWriter}, manages communication with the {@link IndexWriterRAMManager}
*/
public static class PerWriterIndexWriterRAMManager {

private final int id;
private final IndexWriterRAMManager manager;

PerWriterIndexWriterRAMManager(IndexWriter writer, IndexWriterRAMManager manager) {
id = manager.registerWriter(writer);
this.manager = manager;
}

void removeWriter() {
manager.removeWriter(id);
}

void flushIfNecessary() throws IOException {
manager.flushIfNecessary(id);
}
}
}

0 comments on commit 2c6ed50

Please sign in to comment.