Skip to content
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ New Features

Improvements
---------------------
* GITHUB#13475: Re-enable intra-merge parallelism except for terms, norms, and doc values.
Related to GITHUB#13478. (Ben Trent)

* GITHUB#13548: Refactor and javadoc update for KNN vector writer classes. (Patrick Zhai)

Expand Down
53 changes: 52 additions & 1 deletion lucene/core/src/java/org/apache/lucene/index/MergeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*
* @lucene.experimental
*/
public class MergeState {
public class MergeState implements Cloneable {

/** Maps document IDs from old segments to document IDs in the new segment */
public final DocMap[] docMaps;
Expand Down Expand Up @@ -302,4 +302,55 @@ public MergeState(
this.intraMergeTaskExecutor = intraMergeTaskExecutor;
this.needsIndexSort = needsIndexSort;
}

@Override
public MergeState clone() {
StoredFieldsReader[] storedFieldsReaders = this.storedFieldsReaders.clone();
TermVectorsReader[] termVectorsReaders = this.termVectorsReaders.clone();
NormsProducer[] normsProducers = this.normsProducers.clone();
DocValuesProducer[] docValuesProducers = this.docValuesProducers.clone();
FieldsProducer[] fieldsProducers = this.fieldsProducers.clone();
PointsReader[] pointsReaders = this.pointsReaders.clone();
KnnVectorsReader[] knnVectorsReaders = this.knnVectorsReaders.clone();
for (int i = 0; i < storedFieldsReaders.length; ++i) {
if (storedFieldsReaders[i] != null) {
storedFieldsReaders[i] = storedFieldsReaders[i].getMergeInstance();
}
if (termVectorsReaders[i] != null) {
termVectorsReaders[i] = termVectorsReaders[i].getMergeInstance();
}
if (normsProducers[i] != null) {
normsProducers[i] = normsProducers[i].getMergeInstance();
}
if (docValuesProducers[i] != null) {
docValuesProducers[i] = docValuesProducers[i].getMergeInstance();
}
if (fieldsProducers[i] != null) {
fieldsProducers[i] = fieldsProducers[i].getMergeInstance();
}
if (pointsReaders[i] != null) {
pointsReaders[i] = pointsReaders[i].getMergeInstance();
}
if (knnVectorsReaders[i] != null) {
knnVectorsReaders[i] = knnVectorsReaders[i].getMergeInstance();
}
}
return new MergeState(
docMaps,
segmentInfo,
mergeFieldInfos,
storedFieldsReaders,
termVectorsReaders,
normsProducers,
docValuesProducers,
fieldInfos,
liveDocs,
fieldsProducers,
pointsReaders,
knnVectorsReaders,
maxDocs,
infoStream,
intraMergeTaskExecutor,
needsIndexSort);
}
}
41 changes: 25 additions & 16 deletions lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ final class SegmentMerger {

final MergeState mergeState;
private final FieldInfos.Builder fieldInfosBuilder;
final Thread mergeStateCreationThread;

// note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!!
SegmentMerger(
Expand All @@ -68,6 +69,7 @@ final class SegmentMerger {
"IOContext.context should be MERGE; got: " + context.context());
}
mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor);
mergeStateCreationThread = Thread.currentThread();
directory = dir;
this.codec = segmentInfo.getCodec();
this.context = context;
Expand Down Expand Up @@ -99,6 +101,16 @@ boolean shouldMerge() {
return mergeState.segmentInfo.maxDoc() > 0;
}

private MergeState mergeState() {
MergeState mergeState = this.mergeState;
if (Thread.currentThread() != mergeStateCreationThread) {
// Most merges, e.g. small merges, run in the same thread, so save the cost of pulling a clone
// in that case.
mergeState = mergeState.clone();
}
return mergeState;
}

/**
* Merges the readers into the directory passed to the constructor
*
Expand Down Expand Up @@ -137,25 +149,15 @@ MergeState merge() throws IOException {

TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
List<Callable<Void>> mergingTasks = new ArrayList<>();
mergingTasks.add(
() -> {
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(
this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}

mergeWithLogging(
this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
return null;
});
mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);

if (mergeState.mergeFieldInfos.hasDocValues()) {
mergingTasks.add(
() -> {
mergeWithLogging(
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
return null;
});
mergeWithLogging(
this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged);
}

if (mergeState.mergeFieldInfos.hasPointValues()) {
Expand Down Expand Up @@ -201,27 +203,31 @@ private void mergeFieldInfos(

private void mergeDocValues(
SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) throws IOException {
MergeState mergeState = mergeState();
try (DocValuesConsumer consumer = codec.docValuesFormat().fieldsConsumer(segmentWriteState)) {
consumer.merge(mergeState);
}
}

private void mergePoints(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState)
throws IOException {
MergeState mergeState = mergeState();
try (PointsWriter writer = codec.pointsFormat().fieldsWriter(segmentWriteState)) {
writer.merge(mergeState);
}
}

private void mergeNorms(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState)
throws IOException {
MergeState mergeState = mergeState();
try (NormsConsumer consumer = codec.normsFormat().normsConsumer(segmentWriteState)) {
consumer.merge(mergeState);
}
}

private void mergeTerms(SegmentWriteState segmentWriteState, SegmentReadState segmentReadState)
throws IOException {
MergeState mergeState = mergeState();
try (NormsProducer norms =
mergeState.mergeFieldInfos.hasNorms()
? codec.normsFormat().normsProducer(segmentReadState)
Expand Down Expand Up @@ -256,6 +262,7 @@ public void mergeFieldInfos() {
* @throws IOException if there is a low-level IO error
*/
private int mergeFields() throws IOException {
MergeState mergeState = mergeState();
try (StoredFieldsWriter fieldsWriter =
codec.storedFieldsFormat().fieldsWriter(directory, mergeState.segmentInfo, context)) {
return fieldsWriter.merge(mergeState);
Expand All @@ -268,6 +275,7 @@ private int mergeFields() throws IOException {
* @throws IOException if there is a low-level IO error
*/
private int mergeTermVectors() throws IOException {
MergeState mergeState = mergeState();
try (TermVectorsWriter termVectorsWriter =
codec.termVectorsFormat().vectorsWriter(directory, mergeState.segmentInfo, context)) {
int numMerged = termVectorsWriter.merge(mergeState);
Expand All @@ -278,6 +286,7 @@ private int mergeTermVectors() throws IOException {

private void mergeVectorValues(
SegmentWriteState segmentWriteState, SegmentReadState segmentReadState) throws IOException {
MergeState mergeState = mergeState();
try (KnnVectorsWriter writer = codec.knnVectorsFormat().fieldsWriter(segmentWriteState)) {
writer.merge(mergeState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public void testBackgroundForceMerge() throws IOException {
dir.close();
}

@AwaitsFix(bugUrl = "https://github.com/apache/lucene/issues/13478")
public void testMergePerField() throws IOException {
IndexWriterConfig config = new IndexWriterConfig();
ConcurrentMergeScheduler mergeScheduler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -943,10 +944,10 @@ public static IndexWriterConfig newIndexWriterConfig(Random r, Analyzer a) {
} else if (rarely(r)) {
ConcurrentMergeScheduler cms;
if (r.nextBoolean()) {
cms = new ConcurrentMergeScheduler();
cms = new TestConcurrentMergeScheduler();
} else {
cms =
new ConcurrentMergeScheduler() {
new TestConcurrentMergeScheduler() {
@Override
protected synchronized boolean maybeStall(MergeSource mergeSource) {
return true;
Expand All @@ -965,7 +966,8 @@ protected synchronized boolean maybeStall(MergeSource mergeSource) {
} else {
// Always use consistent settings, else CMS's dynamic (SSD or not)
// defaults can change, hurting reproducibility:
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
ConcurrentMergeScheduler cms =
randomBoolean() ? new TestConcurrentMergeScheduler() : new ConcurrentMergeScheduler();

// Only 1 thread can run at once (should maybe help reproducibility),
// with up to 3 pending merges before segment-producing threads are
Expand Down Expand Up @@ -3292,4 +3294,17 @@ protected static KnnVectorsFormat randomVectorFormat(VectorEncoding vectorEncodi
.toList();
return RandomPicks.randomFrom(random(), availableFormats);
}

/**
* This is a test merge scheduler that will always use the intra merge executor to ensure we test
* it.
*/
static class TestConcurrentMergeScheduler extends ConcurrentMergeScheduler {
@Override
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
// Always do the intra merge executor to ensure we test it
return intraMergeExecutor;
}
}
}