Skip to content

Commit

Permalink
addressing comments and fixes and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Jul 19, 2024
1 parent 6c80d66 commit aa04916
Show file tree
Hide file tree
Showing 17 changed files with 1,993 additions and 1,343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -63,10 +62,8 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes();
compositeFieldSet = new HashSet<>();
segmentFieldSet = new HashSet<>();
Iterator<FieldInfo> fieldInfoIterator = segmentWriteState.fieldInfos.iterator();
while (fieldInfoIterator.hasNext()) {
FieldInfo fi = fieldInfoIterator.next();
if (fi.getDocValuesType().equals(DocValuesType.SORTED_NUMERIC)) {
for (FieldInfo fi : segmentWriteState.fieldInfos) {
if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType())) {
segmentFieldSet.add(fi.name);
}
}
Expand Down Expand Up @@ -117,8 +114,8 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
}
segmentFieldSet.remove(field.name);
if (segmentFieldSet.isEmpty()) {
while (compositeFieldSet.iterator().hasNext()) {
String compositeField = compositeFieldSet.iterator().next();
Set<String> compositeFieldSetCopy = new HashSet<>(compositeFieldSet);
for (String compositeField : compositeFieldSetCopy) {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
Expand Down Expand Up @@ -162,7 +159,6 @@ private void mergeCompositeFields(MergeState mergeState) throws IOException {
*/
private void mergeStarTreeFields(MergeState mergeState) throws IOException {
Map<String, List<StarTreeValues>> starTreeSubsPerField = new HashMap<>();
Map<String, StarTreeField> starTreeFieldMap = new HashMap<>();
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
CompositeIndexReader reader = null;
if (mergeState.docValuesProducers[i] == null) {
Expand All @@ -176,30 +172,30 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException {

List<CompositeIndexFieldInfo> compositeFieldInfo = reader.getCompositeIndexFields();
for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) {
StarTreeField starTreeField = null;
if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo);
if (compositeIndexValues instanceof StarTreeValues) {
StarTreeValues starTreeValues = (StarTreeValues) compositeIndexValues;
List<StarTreeValues> fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList());

if (!starTreeFieldMap.containsKey(fieldInfo.getField())) {
starTreeFieldMap.put(fieldInfo.getField(), ((StarTreeValues) compositeIndexValues).getStarTreeField());
if (starTreeField == null) {
starTreeField = starTreeValues.getStarTreeField();
}
// assert star tree configuration is same across segments
else {
if (starTreeFieldMap.get(fieldInfo.getField())
.equals(((StarTreeValues) compositeIndexValues).getStarTreeField()) == false) {
if (starTreeField.equals(starTreeValues.getStarTreeField()) == false) {
throw new IllegalArgumentException(
"star tree field configuration must match the configuration of the field being merged"
);
}
}
fieldsList.add((StarTreeValues) compositeIndexValues);
fieldsList.add(starTreeValues);
starTreeSubsPerField.put(fieldInfo.getField(), fieldsList);
}
}
}
}
final StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService);
starTreesBuilder.buildDuringMerge(starTreeFieldMap, starTreeSubsPerField);
starTreesBuilder.buildDuringMerge(starTreeSubsPerField);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
public class CountValueAggregator implements ValueAggregator<Long> {
public static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
public static final long DEFAULT_INITIAL_VALUE = 1L;
private StarTreeNumericType starTreeNumericType;

public CountValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}

@Override
public MetricStat getAggregationType() {
Expand All @@ -30,12 +35,12 @@ public StarTreeNumericType getAggregatedValueType() {
}

@Override
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
return DEFAULT_INITIAL_VALUE;
}

@Override
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue) {
return value + 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class MetricAggregatorInfo implements Comparable<MetricAggregatorInfo> {
*/
public MetricAggregatorInfo(MetricStat metricStat, String field, String starFieldName, IndexNumericFieldData.NumericType numericType) {
this.metricStat = metricStat;
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat);
this.starTreeNumericType = StarTreeNumericType.fromNumericType(numericType);
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat, this.starTreeNumericType);
this.field = field;
this.starFieldName = starFieldName;
this.metric = toFieldName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public class SumValueAggregator implements ValueAggregator<Double> {
private double compensation = 0;
private CompensatedSum kahanSummation = new CompensatedSum(0, 0);

private StarTreeNumericType starTreeNumericType;

public SumValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}

@Override
public MetricStat getAggregationType() {
return MetricStat.SUM;
Expand All @@ -35,7 +41,7 @@ public StarTreeNumericType getAggregatedValueType() {
}

@Override
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
kahanSummation.reset(0, 0);
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
compensation = kahanSummation.delta();
Expand All @@ -44,7 +50,7 @@ public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue,
}

@Override
public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocValue) {
assert kahanSummation.value() == value;
kahanSummation.reset(sum, compensation);
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
Expand Down Expand Up @@ -89,6 +95,9 @@ public Long toLongValue(Double value) {
@Override
public Double toStarTreeNumericTypeValue(Long value) {
try {
if (value == null) {
return 0.0;
}
return VALUE_AGGREGATOR_TYPE.getDoubleValue(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public interface ValueAggregator<A> {
/**
* Returns the initial aggregated value.
*/
A getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType);
A getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue);

/**
* Applies a segment doc value to the current aggregated value.
*/
A mergeAggregatedValueAndSegmentValue(A value, Long segmentDocValue, StarTreeNumericType starTreeNumericType);
A mergeAggregatedValueAndSegmentValue(A value, Long segmentDocValue);

/**
* Applies an aggregated value to the current aggregated value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ private ValueAggregatorFactory() {}
/**
* Returns a new instance of value aggregator for the given aggregation type.
*
* @param aggregationType Aggregation type
* @param aggregationType Aggregation type
* @param starTreeNumericType Numeric type associated with star tree field ( as specified in index mapping )
* @return Value aggregator
*/
public static ValueAggregator getValueAggregator(MetricStat aggregationType) {
public static ValueAggregator getValueAggregator(MetricStat aggregationType, StarTreeNumericType starTreeNumericType) {
switch (aggregationType) {
// other metric types (count, min, max, avg) will be supported in the future
case SUM:
return new SumValueAggregator();
return new SumValueAggregator(starTreeNumericType);
case COUNT:
return new CountValueAggregator();
return new CountValueAggregator(starTreeNumericType);
default:
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand Down Expand Up @@ -291,10 +292,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
if (isMerge) {
metrics[i] = metricValueAggregator.getInitialAggregatedValue(segmentDocument.metrics[i]);
} else {
metrics[i] = metricValueAggregator.getInitialAggregatedValueForSegmentDocValue(
getLong(segmentDocument.metrics[i]),
starTreeNumericType
);
metrics[i] = metricValueAggregator.getInitialAggregatedValueForSegmentDocValue(getLong(segmentDocument.metrics[i]));
}

} catch (Exception e) {
Expand All @@ -310,14 +308,13 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
StarTreeNumericType starTreeNumericType = metricAggregatorInfos.get(i).getAggregatedValueType();
if (isMerge) {
aggregatedSegmentDocument.metrics[i] = metricValueAggregator.mergeAggregatedValues(
aggregatedSegmentDocument.metrics[i],
segmentDocument.metrics[i]
segmentDocument.metrics[i],
aggregatedSegmentDocument.metrics[i]
);
} else {
aggregatedSegmentDocument.metrics[i] = metricValueAggregator.mergeAggregatedValueAndSegmentValue(
aggregatedSegmentDocument.metrics[i],
getLong(segmentDocument.metrics[i]),
starTreeNumericType
getLong(segmentDocument.metrics[i])
);
}
} catch (Exception e) {
Expand Down Expand Up @@ -715,4 +712,5 @@ public void close() throws IOException {

}

abstract Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValues) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
) throws IOException {
StarTreeDocument[] starTreeDocuments = new StarTreeDocument[totalSegmentDocs];
for (int currentDocId = 0; currentDocId < totalSegmentDocs; currentDocId++) {
// TODO : fast exit if all dimensions are null ( indicating all iterators are exhausted )
// TODO : if all the dimensions are null, then we can skip adding the document
// TODO : if we come out of this loop with the very first document dimensions coming with doc iterators exhausted,
// then we need to return empty star tree
// TODO : we can save empty iterator for dimensions which are not part of segment
starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId, dimensionReaders, metricReaders);
}
Expand All @@ -103,8 +99,9 @@ public void build(List<StarTreeValues> starTreeValuesSubs) throws IOException {
* @param starTreeValuesSubs StarTreeValues from multiple segments
* @return iterator of star tree documents
*/
@Override
Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSubs) throws IOException {
return sortAndAggregateStarTreeDocuments(mergeStarTreeValues(starTreeValuesSubs), true);
return sortAndAggregateStarTreeDocuments(reduceStarTreeDocumentsInSubs(starTreeValuesSubs), true);
}

/**
Expand All @@ -113,13 +110,11 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
* @param starTreeValuesSubs StarTreeValues from multiple segments
* @return array of star tree documents
*/
StarTreeDocument[] mergeStarTreeValues(List<StarTreeValues> starTreeValuesSubs) throws IOException {
StarTreeDocument[] reduceStarTreeDocumentsInSubs(List<StarTreeValues> starTreeValuesSubs) throws IOException {
List<StarTreeDocument> starTreeDocuments = new ArrayList<>();
for (StarTreeValues starTreeValues : starTreeValuesSubs) {
List<Dimension> dimensionsSplitOrder = starTreeValues.getStarTreeField().getDimensionsOrder();
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[starTreeValues.getStarTreeField()
.getDimensionsOrder()
.size()];
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()];

for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
Expand All @@ -136,23 +131,15 @@ StarTreeDocument[] mergeStarTreeValues(List<StarTreeValues> starTreeValuesSubs)
int numSegmentDocs = Integer.parseInt(
starTreeValues.getAttributes().getOrDefault(NUM_SEGMENT_DOCS, String.valueOf(DocIdSetIterator.NO_MORE_DOCS))
);
while (!endOfDoc) {
Long[] dims = new Long[starTreeValues.getStarTreeField().getDimensionsOrder().size()];
while (currentDocId < numSegmentDocs) {
Long[] dims = new Long[dimensionsSplitOrder.size()];
int i = 0;
int endOfDocCounter = 0;
for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) {
int doc = dimensionDocValueIterator.nextDoc(currentDocId);
dimensionDocValueIterator.nextDoc(currentDocId);
Long val = dimensionDocValueIterator.value(currentDocId);
if (doc == DocIdSetIterator.NO_MORE_DOCS) {
endOfDocCounter++;
}
dims[i] = val;
i++;
}
// we've exhausted all dimension readers
if (endOfDocCounter == dims.length) {
break;
}
i = 0;
Object[] metrics = new Object[metricReaders.size()];
for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) {
Expand All @@ -168,7 +155,6 @@ StarTreeDocument[] mergeStarTreeValues(List<StarTreeValues> starTreeValuesSubs)
StarTreeDocument starTreeDocument = new StarTreeDocument(dims, metrics);
starTreeDocuments.add(starTreeDocument);
currentDocId++;
if (currentDocId == numSegmentDocs) break;
}
}
StarTreeDocument[] starTreeDocumentsArr = new StarTreeDocument[starTreeDocuments.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOExce
logger.debug("Starting building {} star-trees with star-tree fields", numStarTrees);

// Build all star-trees
for (int i = 0; i < numStarTrees; i++) {
StarTreeField starTreeField = starTreeFields.get(i);
for (StarTreeField starTreeField : starTreeFields) {
try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(starTreeField, state, mapperService)) {
starTreeBuilder.build(fieldProducerMap);
}
Expand All @@ -91,18 +90,18 @@ public void close() throws IOException {
/**
* Merges star tree fields from multiple segments
*
* @param starTreeFieldMap StarTreeField configuration per field
* @param starTreeValuesSubsPerField starTreeValuesSubs per field
*/
public void buildDuringMerge(
final Map<String, StarTreeField> starTreeFieldMap,
final Map<String, List<StarTreeValues>> starTreeValuesSubsPerField
) throws IOException {
public void buildDuringMerge(final Map<String, List<StarTreeValues>> starTreeValuesSubsPerField) throws IOException {
logger.debug("Starting merge of {} star-trees with star-tree fields", starTreeValuesSubsPerField.size());
long startTime = System.currentTimeMillis();
for (Map.Entry<String, List<StarTreeValues>> entry : starTreeValuesSubsPerField.entrySet()) {
List<StarTreeValues> starTreeValuesList = entry.getValue();
StarTreeField starTreeField = starTreeFieldMap.get(entry.getKey());
if (starTreeValuesList.isEmpty()) {
logger.debug("StarTreeValues is empty for all segments for field : {}", entry.getKey());
continue;
}
StarTreeField starTreeField = starTreeValuesList.get(0).getStarTreeField();
StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService);
builder.build(starTreeValuesList);
builder.close();
Expand Down
Loading

0 comments on commit aa04916

Please sign in to comment.