Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create weights lazily in filter and filters aggregation #26983

Merged
merged 1 commit into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

/**
* Aggregate all docs that match a filter.
*/
public class FilterAggregator extends BucketsAggregator implements SingleBucketAggregator {

private final Weight filter;
private final Supplier<Weight> filter;

public FilterAggregator(String name,
Weight filter,
Supplier<Weight> filter,
AggregatorFactories factories,
SearchContext context,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Expand All @@ -57,7 +58,7 @@ public FilterAggregator(String name,
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
// no need to provide deleted docs to the filter
final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.scorerSupplier(ctx));
final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.get().scorerSupplier(ctx));
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Weight;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
Expand All @@ -35,20 +36,40 @@

public class FilterAggregatorFactory extends AggregatorFactory<FilterAggregatorFactory> {

final Weight weight;
private Weight weight;
private Query filter;

public FilterAggregatorFactory(String name, QueryBuilder filterBuilder, SearchContext context,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, context, parent, subFactoriesBuilder, metaData);
IndexSearcher contextSearcher = context.searcher();
Query filter = filterBuilder.toFilter(context.getQueryShardContext());
weight = contextSearcher.createNormalizedWeight(filter, false);
filter = filterBuilder.toFilter(context.getQueryShardContext());
}

/**
* Returns the {@link Weight} for this filter aggregation, creating it if
* necessary. This is done lazily so that the {@link Weight} is only created
* if the aggregation collects documents reducing the overhead of the
* aggregation in teh case where no documents are collected.
*
* Note that as aggregations are initialsed and executed in a serial manner,
* no concurrency considerations are necessary here.
*/
public Weight getWeight() {
if (weight == null) {
IndexSearcher contextSearcher = context.searcher();
try {
weight = contextSearcher.createNormalizedWeight(filter, false);
} catch (IOException e) {
throw new AggregationInitializationException("Failed to initialse filter", e);
}
}
return weight;
}

@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new FilterAggregator(name, weight, factories, context, parent, pipelineAggregators, metaData);
return new FilterAggregator(name, () -> this.getWeight(), factories, context, parent, pipelineAggregators, metaData);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

public class FiltersAggregator extends BucketsAggregator {

Expand Down Expand Up @@ -115,13 +116,13 @@ public boolean equals(Object obj) {
}

private final String[] keys;
private Weight[] filters;
private Supplier<Weight[]> filters;
private final boolean keyed;
private final boolean showOtherBucket;
private final String otherBucketKey;
private final int totalNumKeys;

public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Weight[] filters, boolean keyed,
public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Supplier<Weight[]> filters, boolean keyed,
String otherBucketKey, SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
Expand All @@ -141,6 +142,7 @@ public FiltersAggregator(String name, AggregatorFactories factories, String[] ke
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
// no need to provide deleted docs to the filter
Weight[] filters = this.filters.get();
final Bits[] bits = new Bits[filters.length];
for (int i = 0; i < filters.length; ++i) {
bits[i] = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filters[i].scorerSupplier(ctx));
Expand All @@ -164,7 +166,7 @@ public void collect(int doc, long bucket) throws IOException {

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(filters.length);
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], bucketDocCount(bucketOrd),
Expand All @@ -184,7 +186,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
@Override
public InternalAggregation buildEmptyAggregation() {
InternalAggregations subAggs = buildEmptySubAggregations();
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(filters.length);
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], 0, subAggs, keyed);
buckets.add(bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Weight;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
Expand All @@ -36,7 +37,8 @@
public class FiltersAggregatorFactory extends AggregatorFactory<FiltersAggregatorFactory> {

private final String[] keys;
final Weight[] weights;
private final Query[] filters;
private Weight[] weights;
private final boolean keyed;
private final boolean otherBucket;
private final String otherBucketKey;
Expand All @@ -48,21 +50,43 @@ public FiltersAggregatorFactory(String name, List<KeyedFilter> filters, boolean
this.keyed = keyed;
this.otherBucket = otherBucket;
this.otherBucketKey = otherBucketKey;
IndexSearcher contextSearcher = context.searcher();
weights = new Weight[filters.size()];
keys = new String[filters.size()];
this.filters = new Query[filters.size()];
for (int i = 0; i < filters.size(); ++i) {
KeyedFilter keyedFilter = filters.get(i);
this.keys[i] = keyedFilter.key();
Query filter = keyedFilter.filter().toFilter(context.getQueryShardContext());
this.weights[i] = contextSearcher.createNormalizedWeight(filter, false);
this.filters[i] = keyedFilter.filter().toFilter(context.getQueryShardContext());
}
}

/**
* Returns the {@link Weight}s for this filter aggregation, creating it if
* necessary. This is done lazily so that the {@link Weight}s are only
* created if the aggregation collects documents reducing the overhead of
* the aggregation in the case where no documents are collected.
*
* Note that as aggregations are initialsed and executed in a serial manner,
* no concurrency considerations are necessary here.
*/
public Weight[] getWeights() {
if (weights == null) {
try {
IndexSearcher contextSearcher = context.searcher();
weights = new Weight[filters.length];
for (int i = 0; i < filters.length; ++i) {
this.weights[i] = contextSearcher.createNormalizedWeight(filters[i], false);
}
} catch (IOException e) {
throw new AggregationInitializationException("Failed to initialse filters for aggregation [" + name() + "]", e);
}
}
return weights;
}

@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new FiltersAggregator(name, factories, keys, weights, keyed, otherBucket ? otherBucketKey : null, context, parent,
return new FiltersAggregator(name, factories, keys, () -> getWeights(), keyed, otherBucket ? otherBucketKey : null, context, parent,
pipelineAggregators, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.nested;

import com.carrotsearch.hppc.LongArrayList;

import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.hamcrest.Matchers;
import org.junit.Before;

Expand Down Expand Up @@ -121,7 +118,7 @@ public void testParsedAsFilter() throws IOException {
AggregatorFactory<?> factory = createAggregatorFactory(builder, indexSearcher, fieldType);
assertThat(factory, Matchers.instanceOf(FilterAggregatorFactory.class));
FilterAggregatorFactory filterFactory = (FilterAggregatorFactory) factory;
Query parsedQuery = filterFactory.weight.getQuery();
Query parsedQuery = filterFactory.getWeight().getQuery();
assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class));
assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size());
// means the bool query has been parsed as a filter, if it was a query minShouldMatch would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void testParsedAsFilter() throws IOException {
AggregatorFactory<?> factory = createAggregatorFactory(builder, indexSearcher, fieldType);
assertThat(factory, Matchers.instanceOf(FiltersAggregatorFactory.class));
FiltersAggregatorFactory filtersFactory = (FiltersAggregatorFactory) factory;
Query parsedQuery = filtersFactory.weights[0].getQuery();
Query parsedQuery = filtersFactory.getWeights()[0].getQuery();
assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class));
assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size());
// means the bool query has been parsed as a filter, if it was a query minShouldMatch would
Expand Down