Skip to content

Commit

Permalink
Add unit test for the tracking memory in the fetch phase
Browse files Browse the repository at this point in the history
  • Loading branch information
andreidan committed Feb 23, 2025
1 parent 41b74cb commit 6e60c6e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ protected SearchHit nextDoc(int doc) throws IOException {

BytesReference sourceRef = hit.hit().getSourceRef();
if (sourceRef != null) {
int sourceLength = sourceRef.length();
this.accumulatedBytesInLeaf += sourceLength;
this.accumulatedBytesInLeaf += sourceRef.length();
}
success = true;
return hit.hit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.elasticsearch.action.search;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Query;
Expand All @@ -20,16 +23,19 @@
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
Expand All @@ -55,6 +61,7 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult;
import org.elasticsearch.search.profile.SearchProfileShardResult;
Expand All @@ -72,10 +79,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class FetchSearchPhaseTests extends ESTestCase {
Expand Down Expand Up @@ -820,6 +829,57 @@ public void testFetchTimeoutNoPartialResults() throws IOException {
}
}

public void testFetchPhaseChecksMemoryBreaker() throws IOException {
Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);

// we're indexing 100 documents with a field that is 48KB long so the fetch phase should check the memory breaker 5 times
// (every 22 documents that accumulate 1MiB in source sizes, and then a final time when we finished processing the one segment)

String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }";
for (int i = 0; i < 100; i++) {
Document document = new Document();
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
document.add(new StoredField("_source", new BytesRef(body)));
w.addDocument(document);
}
w.forceMerge(1);
IndexReader r = w.getReader();
w.close();
ContextIndexSearcher contextIndexSearcher = createSearcher(r);
AtomicInteger breakerCalledCount = new AtomicInteger(0);
NoopCircuitBreaker breakingCircuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST) {
@Override
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
breakerCalledCount.incrementAndGet();
}
};
try (SearchContext searchContext = createSearchContext(contextIndexSearcher, true, breakingCircuitBreaker)) {
FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() {
@Override
public void setNextReader(LeafReaderContext readerContext) throws IOException {

}

@Override
public void process(FetchSubPhase.HitContext hitContext) throws IOException {
Source source = hitContext.source();
hitContext.hit().sourceRef(source.internalSourceRef());
}

@Override
public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NEEDS_SOURCE;
}
}));
fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null);
assertThat(breakerCalledCount.get(), is(5));
} finally {
r.close();
dir.close();
}
}

private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException {
return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() {
@Override
Expand Down Expand Up @@ -857,6 +917,14 @@ public StoredFieldsSpec storedFieldsSpec() {
}

private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) {
return createSearchContext(contextIndexSearcher, allowPartialResults, null);
}

private static SearchContext createSearchContext(
ContextIndexSearcher contextIndexSearcher,
boolean allowPartialResults,
@Nullable CircuitBreaker circuitBreaker
) {
IndexSettings indexSettings = new IndexSettings(
IndexMetadata.builder("index")
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()))
Expand Down Expand Up @@ -929,6 +997,15 @@ public FetchSearchResult fetchResult() {
public ShardSearchRequest request() {
return request;
}

@Override
public CircuitBreaker circuitBreaker() {
if (circuitBreaker != null) {
return circuitBreaker;
} else {
return super.circuitBreaker();
}
}
};
searchContext.addReleasable(searchContext.fetchResult()::decRef);
searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap()));
Expand Down

0 comments on commit 6e60c6e

Please sign in to comment.