Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Star tree] Doc count field support in star tree #15282

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -264,6 +264,7 @@ public void testValidCompositeIndex() {
);
assertEquals(expectedTimeUnits, dateDim.getIntervals());
assertEquals("numeric_dv", starTreeFieldType.getDimensions().get(1).getField());
assertEquals(2, starTreeFieldType.getMetrics().size());
assertEquals("numeric_dv", starTreeFieldType.getMetrics().get(0).getField());
List<MetricStat> expectedMetrics = Arrays.asList(
MetricStat.AVG,
@@ -273,6 +274,10 @@ public void testValidCompositeIndex() {
MetricStat.MIN
);
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics());

assertEquals("_doc_count", starTreeFieldType.getMetrics().get(1).getField());
assertEquals(List.of(MetricStat.DOC_COUNT), starTreeFieldType.getMetrics().get(1).getMetrics());

assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs());
assertEquals(
StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP,
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.SortedNumericDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
@@ -75,6 +76,10 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
@Override
public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addNumericField(field, valuesProducer);
// Perform this only during flush flow
if (mergeState.get() == null && segmentHasCompositeFields) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
createCompositeIndicesIfPossible(valuesProducer, field);
}
}

@Override
@@ -116,12 +121,21 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
if (segmentFieldSet.isEmpty()) {
Set<String> compositeFieldSetCopy = new HashSet<>(compositeFieldSet);
for (String compositeField : compositeFieldSetCopy) {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
return DocValues.emptySortedNumeric();
}
});
if (compositeField.equals("_doc_count")) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public NumericDocValues getNumeric(FieldInfo field) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
return DocValues.emptyNumeric();
}
});
} else {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
return DocValues.emptySortedNumeric();
}
});
}
compositeFieldSet.remove(compositeField);
}
}
Original file line number Diff line number Diff line change
@@ -21,12 +21,21 @@ public enum MetricStat {
AVG("avg"),
SUM("sum"),
MIN("min"),
MAX("max");
MAX("max"),
DOC_COUNT("doc_count", true);

private final String typeName;

// System field stats cannot be used as input for user metric types
private final boolean isSystemFieldStat;

MetricStat(String typeName) {
this(typeName, false);
}

MetricStat(String typeName, boolean isSystemFieldStat) {
this.typeName = typeName;
this.isSystemFieldStat = isSystemFieldStat;
}

public String getTypeName() {
@@ -35,7 +44,8 @@ public String getTypeName() {

public static MetricStat fromTypeName(String typeName) {
for (MetricStat metric : MetricStat.values()) {
if (metric.getTypeName().equalsIgnoreCase(typeName)) {
// prevent system fields to be entered as user input
if (metric.getTypeName().equalsIgnoreCase(typeName) && metric.isSystemFieldStat == false) {
return metric;
}
}
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;
@@ -78,7 +79,7 @@ public static void validate(MapperService mapperService, CompositeIndexSettings
String.format(Locale.ROOT, "unknown metric field [%s] as part of star tree field", metric.getField())
);
}
if (ft.isAggregatable() == false) {
if (ft.isAggregatable() == false && ft instanceof DocCountFieldMapper.DocCountFieldType == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
Original file line number Diff line number Diff line change
@@ -17,12 +17,9 @@
class CountValueAggregator implements ValueAggregator<Long> {

public static final long DEFAULT_INITIAL_VALUE = 1L;
private final StarTreeNumericType starTreeNumericType;
private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;

public CountValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}
public CountValueAggregator(StarTreeNumericType starTreeNumericType) {}
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved

@Override
public StarTreeNumericType getAggregatedValueType() {
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;

/**
* Aggregator to handle '_doc_count' field
*
* @opensearch.experimental
*/
public class DocCountAggregator implements ValueAggregator<Long> {

private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;

public DocCountAggregator(StarTreeNumericType starTreeNumericType) {}
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved

@Override
public StarTreeNumericType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

@Override
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
if (segmentDocValue == null) {
return getIdentityMetricValue();
}
return segmentDocValue;
}

@Override
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue) {
assert value != null;
return mergeAggregatedValues(value, segmentDocValue);
}

@Override
public Long mergeAggregatedValues(Long value, Long aggregatedValue) {
if (value == null) {
value = getIdentityMetricValue();
}
if (aggregatedValue == null) {
aggregatedValue = getIdentityMetricValue();
}
return value + aggregatedValue;
}

@Override
public Long toStarTreeNumericTypeValue(Long value) {
return value;
}

@Override
public Long getIdentityMetricValue() {
return 1L;
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@ public static ValueAggregator getValueAggregator(MetricStat aggregationType, Sta
return new MinValueAggregator(starTreeNumericType);
case MAX:
return new MaxValueAggregator(starTreeNumericType);
case DOC_COUNT:
return new DocCountAggregator(starTreeNumericType);
default:
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexOptions;
@@ -117,6 +118,16 @@
public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService mapperService) {
List<MetricAggregatorInfo> metricAggregatorInfos = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
if (metric.getField().equals("_doc_count")) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
MetricAggregatorInfo metricAggregatorInfo = new MetricAggregatorInfo(
MetricStat.DOC_COUNT,
metric.getField(),
starTreeField.getName(),
IndexNumericFieldData.NumericType.LONG
);
metricAggregatorInfos.add(metricAggregatorInfo);
continue;
}
for (MetricStat metricStat : metric.getMetrics()) {
IndexNumericFieldData.NumericType numericType;
Mapper fieldMapper = mapperService.documentMapper().mappers().getMapper(metric.getField());
@@ -470,20 +481,40 @@
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
SequentialDocValuesIterator metricReader = null;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField());
}

SequentialDocValuesIterator metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
if (metricStat.equals(MetricStat.DOC_COUNT)) {
metricReader = getDocCountMetricReader(fieldProducerMap, metricFieldInfo);
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
} else {
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
}
metricReaders.add(metricReader);
}
}
return metricReaders;
}

private static SequentialDocValuesIterator getDocCountMetricReader(
Map<String, DocValuesProducer> fieldProducerMap,
FieldInfo metricFieldInfo
) throws IOException {
SequentialDocValuesIterator metricReader;
// _doc_count is numeric field , so we need to get sortedNumericDocValues
if (fieldProducerMap.containsKey(metricFieldInfo.name)) {
metricReader = new SequentialDocValuesIterator(
DocValues.singleton(fieldProducerMap.get(metricFieldInfo.name).getNumeric(metricFieldInfo))
);
} else {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
metricReader = new SequentialDocValuesIterator(DocValues.emptySortedNumeric());

Check warning on line 513 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java#L513

Added line #L513 was not covered by tests
}
return metricReader;
}

/**
* Builds the star tree using Star-Tree Document
*
Original file line number Diff line number Diff line change
@@ -10,6 +10,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.common.annotation.ExperimentalApi;
@@ -137,6 +139,12 @@
}
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Map.Entry<String, DocIdSetIterator> metricDocValuesEntry : starTreeValues.getMetricDocValuesIteratorMap().entrySet()) {
if (metricDocValuesEntry.getValue() instanceof NumericDocValues) {
metricReaders.add(
new SequentialDocValuesIterator(DocValues.singleton((NumericDocValues) metricDocValuesEntry.getValue()))

Check warning on line 144 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java#L143-L144

Added lines #L143 - L144 were not covered by tests
);
continue;

Check warning on line 146 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java#L146

Added line #L146 was not covered by tests
}
metricReaders.add(new SequentialDocValuesIterator(metricDocValuesEntry.getValue()));
}
int currentDocId = 0;
Original file line number Diff line number Diff line change
@@ -225,6 +225,9 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
for (Object metric : metricsList) {
Map<String, Object> metricMap = (Map<String, Object>) metric;
String name = (String) XContentMapValues.extractValue(CompositeDataCubeFieldType.NAME, metricMap);
if (name.equals("_doc_count")) {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
metricMap.remove(CompositeDataCubeFieldType.NAME);
if (objbuilder == null || objbuilder.mappersBuilders == null) {
metrics.add(getMetric(name, metricMap, context));
@@ -249,7 +252,8 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
} else {
throw new MapperParsingException(String.format(Locale.ROOT, "unable to parse metrics for star tree field [%s]", this.name));
}

Metric docCountMetric = new Metric("_doc_count", List.of(MetricStat.DOC_COUNT));
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
metrics.add(docCountMetric);
return metrics;
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ public void testValidStarTree() throws IOException {
Set<CompositeMappedFieldType> compositeFieldTypes = mapperService.getCompositeFieldTypes();
for (CompositeMappedFieldType type : compositeFieldTypes) {
StarTreeMapper.StarTreeFieldType starTreeFieldType = (StarTreeMapper.StarTreeFieldType) type;
assertEquals(2, starTreeFieldType.getDimensions().size());
assertEquals("@timestamp", starTreeFieldType.getDimensions().get(0).getField());
assertTrue(starTreeFieldType.getDimensions().get(0) instanceof DateDimension);
DateDimension dateDim = (DateDimension) starTreeFieldType.getDimensions().get(0);
@@ -65,9 +66,15 @@ public void testValidStarTree() throws IOException {
);
assertEquals(expectedTimeUnits, dateDim.getIntervals());
assertEquals("status", starTreeFieldType.getDimensions().get(1).getField());
assertEquals(2, starTreeFieldType.getMetrics().size());
assertEquals("size", starTreeFieldType.getMetrics().get(0).getField());
List<MetricStat> expectedMetrics = Arrays.asList(MetricStat.SUM, MetricStat.AVG);
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics());

Metric metric = starTreeFieldType.getMetrics().get(1);
assertEquals("_doc_count", metric.getField());
assertEquals(List.of(MetricStat.DOC_COUNT), metric.getMetrics());

assertEquals(100, starTreeFieldType.getStarTreeConfig().maxLeafDocs());
assertEquals(StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP, starTreeFieldType.getStarTreeConfig().getBuildMode());
assertEquals(
@@ -91,6 +98,7 @@ public void testValidStarTreeDefaults() throws IOException {
);
assertEquals(expectedTimeUnits, dateDim.getIntervals());
assertEquals("status", starTreeFieldType.getDimensions().get(1).getField());
assertEquals(3, starTreeFieldType.getMetrics().size());
assertEquals("status", starTreeFieldType.getMetrics().get(0).getField());
List<MetricStat> expectedMetrics = Arrays.asList(
MetricStat.AVG,
@@ -100,6 +108,13 @@ public void testValidStarTreeDefaults() throws IOException {
MetricStat.MIN
);
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics());

assertEquals("metric_field", starTreeFieldType.getMetrics().get(1).getField());
expectedMetrics = Arrays.asList(MetricStat.AVG, MetricStat.COUNT, MetricStat.SUM, MetricStat.MAX, MetricStat.MIN);
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(1).getMetrics());
Metric metric = starTreeFieldType.getMetrics().get(2);
assertEquals("_doc_count", metric.getField());
assertEquals(List.of(MetricStat.DOC_COUNT), metric.getMetrics());
assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs());
assertEquals(StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP, starTreeFieldType.getStarTreeConfig().getBuildMode());
assertEquals(Collections.emptySet(), starTreeFieldType.getStarTreeConfig().getSkipStarNodeCreationInDims());
@@ -136,7 +151,7 @@ public void testNoMetrics() {
public void testInvalidParam() {
MapperParsingException ex = expectThrows(
MapperParsingException.class,
() -> createMapperService(getInvalidMapping(false, false, false, false, true))
() -> createMapperService(getInvalidMapping(false, false, false, false, true, false))
);
assertEquals(
"Failed to parse mapping [_doc]: Star tree mapping definition has unsupported parameters: [invalid : {invalid=invalid}]",
@@ -182,6 +197,14 @@ public void testInvalidMetricType() {
);
}

public void testInvalidMetricTypeWithDocCount() {
MapperParsingException ex = expectThrows(
MapperParsingException.class,
() -> createMapperService(getInvalidMapping(false, false, false, false, false, true))
);
assertEquals("Failed to parse mapping [_doc]: Invalid metric stat: _doc_count", ex.getMessage());
}

public void testInvalidDimType() {
MapperParsingException ex = expectThrows(
MapperParsingException.class,
@@ -546,7 +569,8 @@ private XContentBuilder getInvalidMapping(
boolean invalidSkipDims,
boolean invalidDimType,
boolean invalidMetricType,
boolean invalidParam
boolean invalidParam,
boolean invalidDocCountMetricType
) throws IOException {
return topMapping(b -> {
b.startObject("composite");
@@ -583,6 +607,12 @@ private XContentBuilder getInvalidMapping(
b.endObject();
b.startObject();
b.field("name", "metric_field");
if (invalidDocCountMetricType) {
b.startArray("stats");
b.value("_doc_count");
b.value("avg");
b.endArray();
}
b.endObject();
b.endArray();
b.endObject();
@@ -681,7 +711,7 @@ private XContentBuilder getInvalidMappingWithDv(

private XContentBuilder getInvalidMapping(boolean singleDim, boolean invalidSkipDims, boolean invalidDimType, boolean invalidMetricType)
throws IOException {
return getInvalidMapping(singleDim, invalidSkipDims, invalidDimType, invalidMetricType, false);
return getInvalidMapping(singleDim, invalidSkipDims, invalidDimType, invalidMetricType, false, false);
}

protected boolean supportsOrIgnoresBoost() {