Skip to content

Commit

Permalink
Doc count field changes in star tree
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Aug 16, 2024
1 parent b316279 commit c647863
Show file tree
Hide file tree
Showing 12 changed files with 405 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public void testValidCompositeIndex() {
);
assertEquals(expectedTimeUnits, dateDim.getIntervals());
assertEquals("numeric_dv", starTreeFieldType.getDimensions().get(1).getField());
assertEquals(2, starTreeFieldType.getMetrics().size());
assertEquals("numeric_dv", starTreeFieldType.getMetrics().get(0).getField());
List<MetricStat> expectedMetrics = Arrays.asList(
MetricStat.AVG,
Expand All @@ -273,6 +274,10 @@ public void testValidCompositeIndex() {
MetricStat.MIN
);
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics());

assertEquals("_doc_count", starTreeFieldType.getMetrics().get(1).getField());
assertEquals(List.of(MetricStat.DOC_COUNT), starTreeFieldType.getMetrics().get(1).getMetrics());

assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs());
assertEquals(
StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.SortedNumericDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
Expand Down Expand Up @@ -75,6 +76,10 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
@Override
public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addNumericField(field, valuesProducer);
// Perform this only during flush flow
if (mergeState.get() == null && segmentHasCompositeFields) {
createCompositeIndicesIfPossible(valuesProducer, field);
}
}

@Override
Expand Down Expand Up @@ -116,12 +121,21 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
if (segmentFieldSet.isEmpty()) {
Set<String> compositeFieldSetCopy = new HashSet<>(compositeFieldSet);
for (String compositeField : compositeFieldSetCopy) {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
return DocValues.emptySortedNumeric();
}
});
if (compositeField.equals("_doc_count")) {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public NumericDocValues getNumeric(FieldInfo field) {
return DocValues.emptyNumeric();
}
});
} else {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
return DocValues.emptySortedNumeric();
}
});
}
compositeFieldSet.remove(compositeField);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@ public enum MetricStat {
AVG("avg"),
SUM("sum"),
MIN("min"),
MAX("max");
MAX("max"),
DOC_COUNT("doc_count", true);

private final String typeName;

// System field stats cannot be used as input for user metric types
private final boolean isSystemFieldStat;

MetricStat(String typeName) {
this(typeName, false);
}

MetricStat(String typeName, boolean isSystemFieldStat) {
this.typeName = typeName;
this.isSystemFieldStat = isSystemFieldStat;
}

public String getTypeName() {
Expand All @@ -35,7 +44,8 @@ public String getTypeName() {

public static MetricStat fromTypeName(String typeName) {
for (MetricStat metric : MetricStat.values()) {
if (metric.getTypeName().equalsIgnoreCase(typeName)) {
// prevent system fields to be entered as user input
if (metric.getTypeName().equalsIgnoreCase(typeName) && metric.isSystemFieldStat == false) {
return metric;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;
Expand Down Expand Up @@ -78,7 +79,7 @@ public static void validate(MapperService mapperService, CompositeIndexSettings
String.format(Locale.ROOT, "unknown metric field [%s] as part of star tree field", metric.getField())
);
}
if (ft.isAggregatable() == false) {
if (ft.isAggregatable() == false && ft instanceof DocCountFieldMapper.DocCountFieldType == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
class CountValueAggregator implements ValueAggregator<Long> {

public static final long DEFAULT_INITIAL_VALUE = 1L;
private final StarTreeNumericType starTreeNumericType;
private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;

public CountValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}
public CountValueAggregator(StarTreeNumericType starTreeNumericType) {}

@Override
public StarTreeNumericType getAggregatedValueType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;

/**
* Aggregator to handle '_doc_count' field
*
* @opensearch.experimental
*/
public class DocCountAggregator implements ValueAggregator<Long> {

private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;

public DocCountAggregator(StarTreeNumericType starTreeNumericType) {}

@Override
public StarTreeNumericType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

@Override
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
if (segmentDocValue == null) {
return getIdentityMetricValue();
}
return segmentDocValue;
}

@Override
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue) {
assert value != null;
return mergeAggregatedValues(value, segmentDocValue);
}

@Override
public Long mergeAggregatedValues(Long value, Long aggregatedValue) {
if (value == null) {
value = getIdentityMetricValue();
}
if (aggregatedValue == null) {
aggregatedValue = getIdentityMetricValue();
}
return value + aggregatedValue;
}

@Override
public Long toStarTreeNumericTypeValue(Long value) {
return value;
}

@Override
public Long getIdentityMetricValue() {
return 1L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public static ValueAggregator getValueAggregator(MetricStat aggregationType, Sta
return new MinValueAggregator(starTreeNumericType);
case MAX:
return new MaxValueAggregator(starTreeNumericType);
case DOC_COUNT:
return new DocCountAggregator(starTreeNumericType);
default:
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexOptions;
Expand Down Expand Up @@ -117,6 +118,16 @@ protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState sta
public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService mapperService) {
List<MetricAggregatorInfo> metricAggregatorInfos = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
if (metric.getField().equals("_doc_count")) {
MetricAggregatorInfo metricAggregatorInfo = new MetricAggregatorInfo(
MetricStat.DOC_COUNT,
metric.getField(),
starTreeField.getName(),
IndexNumericFieldData.NumericType.LONG
);
metricAggregatorInfos.add(metricAggregatorInfo);
continue;
}
for (MetricStat metricStat : metric.getMetrics()) {
IndexNumericFieldData.NumericType numericType;
Mapper fieldMapper = mapperService.documentMapper().mappers().getMapper(metric.getField());
Expand Down Expand Up @@ -470,20 +481,40 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
SequentialDocValuesIterator metricReader = null;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField());
}

SequentialDocValuesIterator metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
if (metricStat.equals(MetricStat.DOC_COUNT)) {
metricReader = getDocCountMetricReader(fieldProducerMap, metricFieldInfo);
} else {
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
}
metricReaders.add(metricReader);
}
}
return metricReaders;
}

private static SequentialDocValuesIterator getDocCountMetricReader(
Map<String, DocValuesProducer> fieldProducerMap,
FieldInfo metricFieldInfo
) throws IOException {
SequentialDocValuesIterator metricReader;
// _doc_count is numeric field , so we need to get sortedNumericDocValues
if (fieldProducerMap.containsKey(metricFieldInfo.name)) {
metricReader = new SequentialDocValuesIterator(
DocValues.singleton(fieldProducerMap.get(metricFieldInfo.name).getNumeric(metricFieldInfo))
);
} else {
metricReader = new SequentialDocValuesIterator(DocValues.emptySortedNumeric());

Check warning on line 513 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java#L513

Added line #L513 was not covered by tests
}
return metricReader;
}

/**
* Builds the star tree using Star-Tree Document
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.common.annotation.ExperimentalApi;
Expand Down Expand Up @@ -137,6 +139,12 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
}
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Map.Entry<String, DocIdSetIterator> metricDocValuesEntry : starTreeValues.getMetricDocValuesIteratorMap().entrySet()) {
if (metricDocValuesEntry.getValue() instanceof NumericDocValues) {
metricReaders.add(
new SequentialDocValuesIterator(DocValues.singleton((NumericDocValues) metricDocValuesEntry.getValue()))

Check warning on line 144 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java#L143-L144

Added lines #L143 - L144 were not covered by tests
);
continue;

Check warning on line 146 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java#L146

Added line #L146 was not covered by tests
}
metricReaders.add(new SequentialDocValuesIterator(metricDocValuesEntry.getValue()));
}
int currentDocId = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
for (Object metric : metricsList) {
Map<String, Object> metricMap = (Map<String, Object>) metric;
String name = (String) XContentMapValues.extractValue(CompositeDataCubeFieldType.NAME, metricMap);
if (name.equals("_doc_count")) {
continue;
}
metricMap.remove(CompositeDataCubeFieldType.NAME);
if (objbuilder == null || objbuilder.mappersBuilders == null) {
metrics.add(getMetric(name, metricMap, context));
Expand All @@ -249,7 +252,8 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
} else {
throw new MapperParsingException(String.format(Locale.ROOT, "unable to parse metrics for star tree field [%s]", this.name));
}

Metric docCountMetric = new Metric("_doc_count", List.of(MetricStat.DOC_COUNT));
metrics.add(docCountMetric);
return metrics;
}

Expand Down
Loading

0 comments on commit c647863

Please sign in to comment.