Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LUCENE-10599: Improve LogMergePolicy's handling of maxMergeSize. #935

Merged
merged 4 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ Optimizations

Bug Fixes
---------------------
(No changes)

* LUCENE-10599: LogMergePolicy is more likely to keep merging segments until
they reach the maximum merge size. (Adrien Grand)

Other
---------------------
Expand Down
40 changes: 25 additions & 15 deletions lucene/core/src/java/org/apache/lucene/index/LogMergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,23 +568,41 @@ public MergeSpecification findMerges(
// Finally, record all merges that are viable at this level:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the level quantization process above actually:

  1. From L527 it determines the max level, and then a level bottom based on that
  2. Then it find the right boundary by search backwards for the first qualified segment in L556-

This seems assuming the levels are sorted, but I can't find the sorting anywhere. Or otherwise how could it guarantee that the level decided in range of [start,end) won't contain segments that have lower level than levelBottom?

Sorry the question is not quite related to the change itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could it guarantee that the level decided in range of [start,end) won't contain segments that have lower level than levelBottom?

LogMergePolicy doesn't try to provide this guarantee. It's actually important it doesn't try to provide this guarantee, otherwise it could end up with lots of unmerged segments. For instance imagine that you have 9 segments (S1..S9) on level 10 then one segment (S10) on level 9, one more segment on level 10 (S11) and then potentially other segments.
If LogMergePolicy refused to merge segments that are on a lower level then it could never merge together segments S1..S10. This is because segment S10 can only be merged with segments that are on a higher level because both the previous and the next segment are on a higher level, and LogMergePolicy only merges adjacent segments.

This is a downside of LogMergePolicy compared to TieredMergePolicy: because it only performs merges of adjacent segments, it sometimes has to return merges where not all segments are on the same level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

int end = start + mergeFactor;
while (end <= 1 + upto) {
boolean anyTooLarge = false;
boolean anyMerging = false;
long mergeSize = 0;
long mergeDocs = 0;
for (int i = start; i < end; i++) {
final SegmentInfoAndLevel segLevel = levels.get(i);
final SegmentCommitInfo info = segLevel.info;
anyTooLarge |=
(size(info, mergeContext) >= maxMergeSize
|| sizeDocs(info, mergeContext) >= maxMergeDocs);
if (mergingSegments.contains(info)) {
anyMerging = true;
break;
}
long segmentSize = size(info, mergeContext);
long segmentDocs = sizeDocs(info, mergeContext);
if (mergeSize + segmentSize > maxMergeSize || mergeDocs + segmentDocs > maxMergeDocs) {
// This merge is full, stop adding more segments to it
if (i == start) {
// This segment alone is too large, return a singleton merge
if (verbose(mergeContext)) {
message(
" " + i + " is larger than the max merge size/docs; ignoring", mergeContext);
}
end = i + 1;
} else {
// Previous segments are under the max merge size, return them
end = i;
}
break;
}
mergeSize += segmentSize;
mergeDocs += segmentDocs;
}

if (anyMerging) {
// skip
} else if (!anyTooLarge) {
if (anyMerging || end - start <= 1) {
// skip: there is an ongoing merge at the current level or the computed merge has a single
// segment and this merge policy doesn't do singleton merges
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically for a level which meets the merge factor we're merging every segments slice that is not exceed the max size specified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

if (spec == null) {
spec = new MergeSpecification();
}
Expand All @@ -604,14 +622,6 @@ public MergeSpecification findMerges(
mergeContext);
}
spec.add(new OneMerge(mergeInfos));
} else if (verbose(mergeContext)) {
message(
" "
+ start
+ " to "
+ end
+ ": contains segment over maxMergeSize or maxMergeDocs; skipping",
mergeContext);
}

start = end;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws
protected void assertMerge(MergePolicy policy, MergeSpecification merge) throws IOException {
LogMergePolicy lmp = (LogMergePolicy) policy;
for (OneMerge oneMerge : merge.merges) {
assertEquals(lmp.getMergeFactor(), oneMerge.segments.size());
assertTrue(oneMerge.segments.size() <= lmp.getMergeFactor());
}
}

Expand Down Expand Up @@ -187,4 +187,58 @@ public void testRejectUnbalancedMerges() throws IOException {
assertEquals(100, segmentInfos.info(0).info.maxDoc());
assertEquals(10, segmentInfos.info(1).info.maxDoc());
}

public void testPackLargeSegments() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
IOStats stats = new IOStats();
mergePolicy.setMaxMergeDocs(10_000);
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 10 segments below the max segment size, but larger than maxMergeSize/mergeFactor
for (int i = 0; i < 10; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 3_000, 0, 0, IndexWriter.SOURCE_MERGE));
}
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
// LogMP packed 3 3k segments together
assertEquals(9_000, segmentInfos.info(0).info.maxDoc());
}

public void testIgnoreLargeSegments() throws IOException {
LogDocMergePolicy mergePolicy = new LogDocMergePolicy();
IOStats stats = new IOStats();
mergePolicy.setMaxMergeDocs(10_000);
AtomicLong segNameGenerator = new AtomicLong();
MergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
// 1 segment that reached the maximum segment size
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 11_000, 0, 0, IndexWriter.SOURCE_MERGE));
// and 10 segments below the max segment size, but within the same level
for (int i = 0; i < 10; ++i) {
segmentInfos.add(
makeSegmentCommitInfo(
"_" + segNameGenerator.getAndIncrement(), 2_000, 0, 0, IndexWriter.SOURCE_MERGE));
}
// LogMergePolicy used to have a bug that would make it exclude the first mergeFactor segments
// from merging if any of them was above the maximum merged size
MergeSpecification spec =
mergePolicy.findMerges(MergeTrigger.EXPLICIT, segmentInfos, mergeContext);
assertNotNull(spec);
for (OneMerge oneMerge : spec.merges) {
segmentInfos =
applyMerge(segmentInfos, oneMerge, "_" + segNameGenerator.getAndIncrement(), stats);
}
assertEquals(11_000, segmentInfos.info(0).info.maxDoc());
assertEquals(10_000, segmentInfos.info(1).info.maxDoc());
}
}