Skip to content

Commit 734dae6

Browse files
authored
Fix partial reduce bug in ip_prefix (#89734)
This stops us from applying the `min_doc_count` operation on partial reduction. If run it on partial reduction then we filter out results that might have more docs arrive. Closes #89686
1 parent c97e5fd commit 734dae6

File tree

5 files changed

+205
-9
lines changed

5 files changed

+205
-9
lines changed

docs/changelog/89734.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 89734
2+
summary: Fix partial reduce bug in `ip_prefix`
3+
area: Aggregations
4+
type: bug
5+
issues:
6+
- 89686

server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
251251
final IteratorAndCurrent<Bucket> top = pq.top();
252252
if (top.current().key.equals(value) == false) {
253253
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
254-
if (reduced.getDocCount() >= minDocCount) {
254+
if (false == reduceContext.isFinalReduce() || reduced.getDocCount() >= minDocCount) {
255255
reducedBuckets.add(reduced);
256256
}
257257
currentBuckets.clear();
@@ -272,7 +272,7 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
272272

273273
if (currentBuckets.isEmpty() == false) {
274274
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
275-
if (reduced.getDocCount() >= minDocCount) {
275+
if (false == reduceContext.isFinalReduce() || reduced.getDocCount() >= minDocCount) {
276276
reducedBuckets.add(reduced);
277277
}
278278
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.aggregations.bucket.prefix;
10+
11+
import org.apache.lucene.document.InetAddressPoint;
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.common.network.InetAddresses;
14+
import org.elasticsearch.search.DocValueFormat;
15+
import org.elasticsearch.search.aggregations.InternalAggregations;
16+
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
17+
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
18+
import org.elasticsearch.test.MapMatcher;
19+
20+
import java.net.Inet6Address;
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.TreeMap;
28+
import java.util.TreeSet;
29+
30+
import static java.util.stream.Collectors.toMap;
31+
import static org.elasticsearch.test.MapMatcher.assertMap;
32+
import static org.elasticsearch.test.MapMatcher.matchesMap;
33+
import static org.hamcrest.Matchers.equalTo;
34+
35+
public class InternalIpPrefixTests extends InternalMultiBucketAggregationTestCase<InternalIpPrefix> {
36+
@Override
37+
protected InternalIpPrefix createTestInstance(String name, Map<String, Object> metadata, InternalAggregations aggregations) {
38+
return createTestInstance(name, metadata, aggregations, randomPrefixLength(), randomMinDocCount());
39+
}
40+
41+
@Override
42+
protected Class<? extends ParsedMultiBucketAggregation<?>> implementationClass() {
43+
// Deprecated high level rest client not supported
44+
return null;
45+
}
46+
47+
private int randomPrefixLength() {
48+
return between(1, InetAddressPoint.BYTES * 8);
49+
}
50+
51+
private long randomMinDocCount() {
52+
return randomBoolean() ? 1 : randomLongBetween(1, Long.MAX_VALUE / (maxNumberOfBuckets() + 1));
53+
}
54+
55+
private InternalIpPrefix createTestInstance(
56+
String name,
57+
Map<String, Object> metadata,
58+
InternalAggregations aggregations,
59+
int prefixLength,
60+
long minDocCount
61+
) {
62+
boolean keyed = randomBoolean();
63+
boolean appendPrefixLength = randomBoolean();
64+
boolean canBeV4 = prefixLength <= 32;
65+
66+
int bucketsCount = between(1, maxNumberOfBuckets());
67+
Set<BytesRef> keys = new TreeSet<>();
68+
while (keys.size() < bucketsCount) {
69+
boolean v4 = canBeV4 && randomBoolean();
70+
byte[] ip = InetAddressPoint.encode(randomIp(v4));
71+
byte[] mask = mask(v4 ? prefixLength + 96 : prefixLength);
72+
byte[] subnet = new byte[InetAddressPoint.BYTES];
73+
for (int i = 0; i < InetAddressPoint.BYTES; i++) {
74+
subnet[i] = (byte) (ip[i] & mask[i]);
75+
}
76+
keys.add(new BytesRef(ip));
77+
}
78+
List<InternalIpPrefix.Bucket> buckets = new ArrayList<>(keys.size());
79+
for (Iterator<BytesRef> itr = keys.iterator(); itr.hasNext();) {
80+
BytesRef key = itr.next();
81+
boolean v6 = InetAddressPoint.decode(key.bytes) instanceof Inet6Address;
82+
buckets.add(
83+
new InternalIpPrefix.Bucket(
84+
DocValueFormat.IP,
85+
key,
86+
keyed,
87+
v6,
88+
prefixLength,
89+
appendPrefixLength,
90+
randomLongBetween(0, Long.MAX_VALUE),
91+
aggregations
92+
)
93+
);
94+
}
95+
96+
return new InternalIpPrefix(name, DocValueFormat.IP, keyed, minDocCount, buckets, metadata);
97+
}
98+
99+
private byte[] mask(int prefixLength) {
100+
byte[] mask = new byte[InetAddressPoint.BYTES];
101+
int m = 0;
102+
int b = 0x80;
103+
for (int i = 0; i < prefixLength; i++) {
104+
mask[m] |= b;
105+
b = b >> 1;
106+
if (b == 0) {
107+
m++;
108+
b = 0x80;
109+
}
110+
}
111+
return mask;
112+
}
113+
114+
@Override
115+
protected BuilderAndToReduce<InternalIpPrefix> randomResultsToReduce(String name, int size) {
116+
Map<String, Object> metadata = createTestMetadata();
117+
InternalAggregations aggregations = createSubAggregations();
118+
int prefixLength = randomPrefixLength();
119+
long minDocCount = randomMinDocCount();
120+
List<InternalIpPrefix> inputs = new ArrayList<>(size);
121+
for (int i = 0; i < size; i++) {
122+
InternalIpPrefix t = createTestInstance(name, metadata, aggregations, prefixLength, minDocCount);
123+
inputs.add(t);
124+
}
125+
return new BuilderAndToReduce<>(mockBuilder(inputs), inputs);
126+
}
127+
128+
@Override
129+
protected void assertReduced(InternalIpPrefix reduced, List<InternalIpPrefix> inputs) {
130+
InternalIpPrefix leader = inputs.get(0);
131+
assertThat(reduced.keyed, equalTo(leader.keyed));
132+
assertThat(reduced.format, equalTo(leader.format));
133+
assertThat(reduced.minDocCount, equalTo(leader.minDocCount));
134+
Map<BytesRef, Long> expectedCounts = new HashMap<>();
135+
for (InternalIpPrefix i : inputs) {
136+
for (InternalIpPrefix.Bucket b : i.getBuckets()) {
137+
assertThat(b.getFormat(), equalTo(DocValueFormat.IP));
138+
long acc = expectedCounts.getOrDefault(b.getKey(), 0L);
139+
acc += b.getDocCount();
140+
expectedCounts.put(b.getKey(), acc);
141+
}
142+
}
143+
MapMatcher countsMatches = matchesMap();
144+
for (Map.Entry<BytesRef, Long> e : expectedCounts.entrySet()) {
145+
if (e.getValue() >= leader.minDocCount) {
146+
countsMatches = countsMatches.entry(DocValueFormat.IP.format(e.getKey()), e.getValue());
147+
}
148+
}
149+
assertMap(
150+
new TreeMap<>(reduced.getBuckets().stream().collect(toMap(b -> b.getKeyAsString(), b -> b.getDocCount()))),
151+
countsMatches
152+
);
153+
}
154+
155+
public void testPartialReduceNoMinDocCount() {
156+
InternalIpPrefix.Bucket b1 = new InternalIpPrefix.Bucket(
157+
DocValueFormat.IP,
158+
new BytesRef(InetAddressPoint.encode(InetAddresses.forString("192.168.0.1"))),
159+
false,
160+
false,
161+
1,
162+
false,
163+
1,
164+
InternalAggregations.EMPTY
165+
);
166+
InternalIpPrefix.Bucket b2 = new InternalIpPrefix.Bucket(
167+
DocValueFormat.IP,
168+
new BytesRef(InetAddressPoint.encode(InetAddresses.forString("200.0.0.1"))),
169+
false,
170+
false,
171+
1,
172+
false,
173+
2,
174+
InternalAggregations.EMPTY
175+
);
176+
InternalIpPrefix t = new InternalIpPrefix("test", DocValueFormat.IP, false, 100, List.of(b1, b2), null);
177+
InternalIpPrefix reduced = (InternalIpPrefix) t.reduce(List.of(t), emptyReduceContextBuilder().forPartialReduction());
178+
assertThat(reduced.getBuckets().get(0).getDocCount(), equalTo(1L));
179+
assertThat(reduced.getBuckets().get(1).getDocCount(), equalTo(2L));
180+
}
181+
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorTests.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.stream.Collectors;
4242

4343
import static java.util.Collections.singleton;
44+
import static org.hamcrest.Matchers.equalTo;
4445

4546
public class IpPrefixAggregatorTests extends AggregatorTestCase {
4647

@@ -928,7 +929,6 @@ public void testIpv6AppendPrefixLength() throws IOException {
928929
}
929930

930931
public void testMinDocCount() throws IOException {
931-
// GIVEN
932932
final int prefixLength = 16;
933933
final String field = "ipv4";
934934
int minDocCount = 2;
@@ -949,22 +949,19 @@ public void testMinDocCount() throws IOException {
949949
new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime())
950950
);
951951

952-
// WHEN
953952
testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
954953
for (TestIpDataHolder ipDataHolder : ipAddresses) {
955954
iw.addDocument(
956955
singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
957956
);
958957
}
959-
}, agg -> {
960-
final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
958+
}, (InternalIpPrefix ipPrefix) -> {
961959
final Set<String> expectedSubnets = Set.of("192.168.0.0");
962960
final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
963961
.stream()
964962
.map(InternalIpPrefix.Bucket::getKeyAsString)
965963
.collect(Collectors.toUnmodifiableSet());
966964

967-
// THEN
968965
ipPrefix.getBuckets().forEach(bucket -> {
969966
assertFalse(bucket.isIpv6());
970967
assertFalse(bucket.appendPrefixLength());
@@ -978,9 +975,9 @@ public void testMinDocCount() throws IOException {
978975
assertTrue(
979976
ipPrefix.getBuckets().stream().map(InternalIpPrefix.Bucket::getDocCount).allMatch(docCount -> docCount >= minDocCount)
980977
);
981-
assertEquals(
978+
assertThat(
982979
ipPrefix.getBuckets().stream().sorted(IP_ADDRESS_KEY_COMPARATOR).map(InternalIpPrefix.Bucket::getDocCount).toList(),
983-
List.of(4L)
980+
equalTo(List.of(4L))
984981
);
985982
}, fieldType);
986983
}

test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ protected final T createTestInstance(String name, Map<String, Object> metadata)
9999

100100
protected abstract T createTestInstance(String name, Map<String, Object> metadata, InternalAggregations aggregations);
101101

102+
/**
103+
* The parsed version used by the deprecated high level rest client or
104+
* {@code null} if the deprecated high level rest client isn't supported
105+
* by this agg.
106+
*/
102107
protected abstract Class<? extends ParsedMultiBucketAggregation<?>> implementationClass();
103108

104109
@Override
@@ -120,6 +125,13 @@ public void testIterators() throws IOException {
120125
assertMultiBucketsAggregations(aggregation, parseAndAssert(aggregation, false, false), true);
121126
}
122127

128+
@Override
129+
protected <P extends ParsedAggregation> P parseAndAssert(InternalAggregation aggregation, boolean shuffled, boolean addRandomFields)
130+
throws IOException {
131+
assumeFalse("deprecated high level rest client not supported", implementationClass() == null);
132+
return super.parseAndAssert(aggregation, shuffled, addRandomFields);
133+
}
134+
123135
@Override
124136
protected void assertSampled(T sampled, T reduced, SamplingContext samplingContext) {
125137
assertBucketCountsScaled(sampled.getBuckets(), reduced.getBuckets(), samplingContext);

0 commit comments

Comments
 (0)