Skip to content

Commit

Permalink
[Star tree] Star tree merge changes (opensearch-project#14652)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie authored and wangdongyu.danny committed Aug 22, 2024
1 parent 61c5022 commit 980cf20
Show file tree
Hide file tree
Showing 32 changed files with 3,281 additions and 1,435 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -74,15 +74,13 @@ public void close() throws IOException {
}

@Override
public List<String> getCompositeIndexFields() {
public List<CompositeIndexFieldInfo> getCompositeIndexFields() {
// todo : read from file formats and get the field names.
throw new UnsupportedOperationException();

return new ArrayList<>();
}

@Override
public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType)
throws IOException {
public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException {
// TODO : read compositeIndexValues [starTreeValues] from star tree files
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.SortedNumericDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -40,8 +49,10 @@ public class Composite99DocValuesWriter extends DocValuesConsumer {
AtomicReference<MergeState> mergeState = new AtomicReference<>();
private final Set<CompositeMappedFieldType> compositeMappedFieldTypes;
private final Set<String> compositeFieldSet;
private final Set<String> segmentFieldSet;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class);

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {

Expand All @@ -50,6 +61,12 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
this.mapperService = mapperService;
this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes();
compositeFieldSet = new HashSet<>();
segmentFieldSet = new HashSet<>();
for (FieldInfo fi : segmentWriteState.fieldInfos) {
if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType())) {
segmentFieldSet.add(fi.name);
}
}
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
compositeFieldSet.addAll(type.fields());
}
Expand Down Expand Up @@ -95,23 +112,91 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
fieldProducerMap.put(field.name, valuesProducer);
compositeFieldSet.remove(field.name);
}
segmentFieldSet.remove(field.name);
if (segmentFieldSet.isEmpty()) {
Set<String> compositeFieldSetCopy = new HashSet<>(compositeFieldSet);
for (String compositeField : compositeFieldSetCopy) {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
return DocValues.emptySortedNumeric();
}
});
compositeFieldSet.remove(compositeField);
}
}
// we have all the required fields to build composite fields
if (compositeFieldSet.isEmpty()) {
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService)) {
starTreesBuilder.build();
if (mappedType.getCompositeIndexType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService)) {
starTreesBuilder.build(fieldProducerMap);
}
}
}
}

}

@Override
public void merge(MergeState mergeState) throws IOException {
this.mergeState.compareAndSet(null, mergeState);
super.merge(mergeState);
// TODO : handle merge star tree
// mergeStarTreeFields(mergeState);
mergeCompositeFields(mergeState);
}

/**
* Merges composite fields from multiple segments
* @param mergeState merge state
*/
private void mergeCompositeFields(MergeState mergeState) throws IOException {
mergeStarTreeFields(mergeState);
}

/**
* Merges star tree data fields from multiple segments
* @param mergeState merge state
*/
private void mergeStarTreeFields(MergeState mergeState) throws IOException {
Map<String, List<StarTreeValues>> starTreeSubsPerField = new HashMap<>();
StarTreeField starTreeField = null;
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
CompositeIndexReader reader = null;
if (mergeState.docValuesProducers[i] == null) {
continue;
}
if (mergeState.docValuesProducers[i] instanceof CompositeIndexReader) {
reader = (CompositeIndexReader) mergeState.docValuesProducers[i];
} else {
continue;
}

List<CompositeIndexFieldInfo> compositeFieldInfo = reader.getCompositeIndexFields();
for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) {
if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo);
if (compositeIndexValues instanceof StarTreeValues) {
StarTreeValues starTreeValues = (StarTreeValues) compositeIndexValues;
List<StarTreeValues> fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList());
if (starTreeField == null) {
starTreeField = starTreeValues.getStarTreeField();
}
// assert star tree configuration is same across segments
else {
if (starTreeField.equals(starTreeValues.getStarTreeField()) == false) {
throw new IllegalArgumentException(
"star tree field configuration must match the configuration of the field being merged"
);
}
}
fieldsList.add(starTreeValues);
starTreeSubsPerField.put(fieldInfo.getField(), fieldsList);
}
}
}
}
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService)) {
starTreesBuilder.buildDuringMerge(starTreeSubsPerField);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

/**
* Field info details of composite index fields
*
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeIndexFieldInfo {
private final String field;
private final CompositeMappedFieldType.CompositeFieldType type;

public CompositeIndexFieldInfo(String field, CompositeMappedFieldType.CompositeFieldType type) {
this.field = field;
this.type = type;
}

public String getField() {
return field;
}

public CompositeMappedFieldType.CompositeFieldType getType() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.List;
Expand All @@ -25,10 +24,10 @@ public interface CompositeIndexReader {
* Get list of composite index fields from the segment
*
*/
List<String> getCompositeIndexFields();
List<CompositeIndexFieldInfo> getCompositeIndexFields();

/**
* Get composite index values based on the field name and the field type
*/
CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType) throws IOException;
CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo fieldInfo) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.index.codec.composite.datacube.startree;

import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.CompositeIndexValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode;

import java.util.List;
import java.util.Map;

/**
* Concrete class that holds the star tree associated values from the segment
Expand All @@ -20,16 +23,48 @@
*/
@ExperimentalApi
public class StarTreeValues implements CompositeIndexValues {
private final List<String> dimensionsOrder;
private final StarTreeField starTreeField;
private final StarTreeNode root;
private final Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap;
private final Map<String, DocIdSetIterator> metricDocValuesIteratorMap;
private final Map<String, String> attributes;

// TODO : come up with full set of vales such as dimensions and metrics doc values + star tree
public StarTreeValues(List<String> dimensionsOrder) {
super();
this.dimensionsOrder = List.copyOf(dimensionsOrder);
public StarTreeValues(
StarTreeField starTreeField,
StarTreeNode root,
Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap,
Map<String, DocIdSetIterator> metricDocValuesIteratorMap,
Map<String, String> attributes
) {
this.starTreeField = starTreeField;
this.root = root;
this.dimensionDocValuesIteratorMap = dimensionDocValuesIteratorMap;
this.metricDocValuesIteratorMap = metricDocValuesIteratorMap;
this.attributes = attributes;
}

@Override
public CompositeIndexValues getValues() {
return this;
}

public StarTreeField getStarTreeField() {
return starTreeField;
}

public StarTreeNode getRoot() {
return root;
}

public Map<String, DocIdSetIterator> getDimensionDocValuesIteratorMap() {
return dimensionDocValuesIteratorMap;
}

public Map<String, DocIdSetIterator> getMetricDocValuesIteratorMap() {
return metricDocValuesIteratorMap;
}

public Map<String, String> getAttributes() {
return attributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
public class CountValueAggregator implements ValueAggregator<Long> {
public static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
public static final long DEFAULT_INITIAL_VALUE = 1L;
private StarTreeNumericType starTreeNumericType;

public CountValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}

@Override
public MetricStat getAggregationType() {
Expand All @@ -30,12 +35,12 @@ public StarTreeNumericType getAggregatedValueType() {
}

@Override
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
return DEFAULT_INITIAL_VALUE;
}

@Override
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue) {
return value + 1;
}

Expand All @@ -60,7 +65,7 @@ public Long toLongValue(Long value) {
}

@Override
public Long toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
public Long toStarTreeNumericTypeValue(Long value) {
return value;
}
}
Loading

0 comments on commit 980cf20

Please sign in to comment.