From 9250579d2b7559788ad712fa683145ee1bea55d8 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Thu, 18 Jul 2024 13:50:31 +0530 Subject: [PATCH] star tree file formats Signed-off-by: Sarthak Aggarwal --- .../Composite99DocValuesConsumer.java | 68 +++ .../lucene90/StarTree99DocValuesProducer.java | 161 +++++++ .../SortedNumericDocValuesWriterHelper.java | 53 +++ .../composite/Composite99DocValuesFormat.java | 30 ++ .../composite/Composite99DocValuesReader.java | 266 ++++++++++- .../composite/Composite99DocValuesWriter.java | 93 +++- .../datacube/startree/StarTreeValues.java | 9 +- .../CompositeIndexConstants.java | 26 ++ .../CompositeIndexMetadata.java | 89 ++++ .../datacube/MergeDimension.java | 56 +++ .../aggregators/CountValueAggregator.java | 5 + .../aggregators/MaxValueAggregator.java | 9 +- .../aggregators/MetricAggregatorInfo.java | 9 +- .../startree/aggregators/MetricEntry.java | 35 ++ .../aggregators/MinValueAggregator.java | 9 +- .../aggregators/SumValueAggregator.java | 5 + .../startree/aggregators/ValueAggregator.java | 5 + .../startree/builder/BaseStarTreeBuilder.java | 430 +++++++++++++----- .../builder/OnHeapStarTreeBuilder.java | 28 +- .../startree/builder/StarTreeBuilder.java | 20 +- .../startree/builder/StarTreesBuilder.java | 48 +- .../startree/meta/StarTreeMetadata.java | 228 ++++++++++ .../datacube/startree/meta/TreeMetadata.java | 112 +++++ .../datacube/startree/meta/package-info.java | 14 + .../node/FixedLengthStarTreeNode.java | 186 ++++++++ .../datacube/startree/node/StarTree.java | 62 +++ .../datacube/startree/node/StarTreeNode.java | 1 - .../datacube/startree/node/Tree.java | 23 + .../datacube/startree/node/package-info.java | 2 + .../utils/SequentialDocValuesIterator.java | 43 ++ .../startree/utils/StarTreeBuilderUtils.java | 76 ++++ .../startree/utils/StarTreeDataWriter.java | 137 ++++++ .../startree/utils/StarTreeHelper.java | 53 +++ .../startree/utils/StarTreeMetaWriter.java | 226 +++++++++ .../datacube/startree/utils/TreeNode.java | 4 +- .../index/compositeindex/package-info.java | 3 +- .../StarTreeDocValuesFormatTests.java | 38 ++ .../aggregators/MaxValueAggregatorTests.java | 2 +- .../aggregators/MinValueAggregatorTests.java | 2 +- .../builder/BaseStarTreeBuilderTests.java | 41 +- .../builder/OnHeapStarTreeBuilderTests.java | 141 ++++-- .../builder/StarTreesBuilderTests.java | 12 +- 42 files changed, 2649 insertions(+), 211 deletions(-) create mode 100644 server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java create mode 100644 server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java create mode 100644 server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexConstants.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexMetadata.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/MergeDimension.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricEntry.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/StarTreeMetadata.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/TreeMetadata.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/package-info.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/FixedLengthStarTreeNode.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/Tree.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeBuilderUtils.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataWriter.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaWriter.java diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java new file mode 100644 index 0000000000000..c0a22a0b32fd2 --- /dev/null +++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.codecs.lucene90; + +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.SegmentWriteState; + +import java.io.IOException; + +/** + * This class is an abstraction of the {@link DocValuesConsumer} for the Star Tree index structure. + * It is responsible to consume various types of document values (numeric, binary, sorted, sorted numeric, + * and sorted set) for fields in the Star Tree index. + * + * @opensearch.experimental + */ +public class Composite99DocValuesConsumer extends DocValuesConsumer { + + Lucene90DocValuesConsumer lucene90DocValuesConsumer; + + public Composite99DocValuesConsumer( + SegmentWriteState state, + String dataCodec, + String dataExtension, + String metaCodec, + String metaExtension + ) throws IOException { + lucene90DocValuesConsumer = new Lucene90DocValuesConsumer(state, dataCodec, dataExtension, metaCodec, metaExtension); + } + + @Override + public void close() throws IOException { + lucene90DocValuesConsumer.close(); + } + + @Override + public void addNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addBinaryField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedSetField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedSetField(fieldInfo, docValuesProducer); + } +} diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java new file mode 100644 index 0000000000000..a2a8c1b4c35fe --- /dev/null +++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.codecs.lucene90; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues; + +/** + * This class is a custom abstraction of the {@link DocValuesProducer} for the Star Tree index structure. + * It is responsible for providing access to various types of document values (numeric, binary, sorted, sorted numeric, + * and sorted set) for fields in the Star Tree index. + * + * @opensearch.experimental + */ +public class StarTree99DocValuesProducer extends DocValuesProducer { + + Lucene90DocValuesProducer lucene90DocValuesProducer; + private final List dimensions; + private final List metrics; + private final FieldInfos fieldInfos; + + public StarTree99DocValuesProducer( + SegmentReadState state, + String dataCodec, + String dataExtension, + String metaCodec, + String metaExtension, + List dimensions, + List metricEntries, + String compositeFieldName + ) throws IOException { + this.dimensions = dimensions; + this.metrics = metricEntries; + + // populates the dummy list of field infos to fetch doc id set iterators for respective fields. + // the dummy field info is used to fetch the doc id set iterators for respective fields based on field name + this.fieldInfos = new FieldInfos(getFieldInfoList(compositeFieldName)); + SegmentReadState segmentReadState = new SegmentReadState(state.directory, state.segmentInfo, fieldInfos, state.context); + lucene90DocValuesProducer = new Lucene90DocValuesProducer(segmentReadState, dataCodec, dataExtension, metaCodec, metaExtension); + } + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getNumeric(field); + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getBinary(field); + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getSorted(field); + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getSortedNumeric(field); + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getSortedSet(field); + } + + @Override + public void checkIntegrity() throws IOException { + this.lucene90DocValuesProducer.checkIntegrity(); + } + + // returns the doc id set iterator based on field name + public SortedNumericDocValues getSortedNumeric(String fieldName) throws IOException { + return this.lucene90DocValuesProducer.getSortedNumeric(fieldInfos.fieldInfo(fieldName)); + } + + @Override + public void close() throws IOException { + this.lucene90DocValuesProducer.close(); + } + + private FieldInfo[] getFieldInfoList(String compositeFieldName) { + FieldInfo[] fieldInfoList = new FieldInfo[this.dimensions.size() + metrics.size()]; + + // field number is not really used. We depend on unique field names to get the desired iterator + int fieldNumber = 0; + + for (FieldInfo dimension : this.dimensions) { + fieldInfoList[fieldNumber] = new FieldInfo( + fullFieldNameForStarTreeDimensionsDocValues(compositeFieldName, dimension.getName()), + fieldNumber, + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + fieldNumber++; + } + for (MetricEntry metric : metrics) { + fieldInfoList[fieldNumber] = new FieldInfo( + fullFieldNameForStarTreeMetricsDocValues(compositeFieldName, metric.getMetricName(), metric.getMetricStat().getTypeName()), + fieldNumber, + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + fieldNumber++; + } + + return fieldInfoList; + } + +} diff --git a/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java new file mode 100644 index 0000000000000..78a89d9179c25 --- /dev/null +++ b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.index; + +import org.apache.lucene.util.Counter; + +/** + * A helper class for writing sorted numeric doc values. + *

+ * This class provides a convenient way to add sorted numeric doc values to a field + * and retrieve the corresponding {@link SortedNumericDocValues} instance. + * + * @opensearch.experimental + */ +public class SortedNumericDocValuesWriterHelper { + + private final SortedNumericDocValuesWriter sortedNumericDocValuesWriter; + + /** + * Sole constructor. Constructs a new {@link SortedNumericDocValuesWriterHelper} instance. + * + * @param fieldInfo the field information for the field being written + * @param counter a counter for tracking memory usage + */ + public SortedNumericDocValuesWriterHelper(FieldInfo fieldInfo, Counter counter) { + sortedNumericDocValuesWriter = new SortedNumericDocValuesWriter(fieldInfo, counter); + } + + /** + * Adds a value to the sorted numeric doc values for the specified document. + * + * @param docID the document ID + * @param value the value to add + */ + public void addValue(int docID, long value) { + sortedNumericDocValuesWriter.addValue(docID, value); + } + + /** + * Returns the {@link SortedNumericDocValues} instance containing the sorted numeric doc values + * + * @return the {@link SortedNumericDocValues} instance + */ + public SortedNumericDocValues getDocValues() { + return sortedNumericDocValuesWriter.getDocValues(); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java index 216ed4f68f333..d4b0c0d1f4b80 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java @@ -37,6 +37,36 @@ public class Composite99DocValuesFormat extends DocValuesFormat { private final DocValuesFormat delegate; private final MapperService mapperService; + /** Data codec name for Composite Doc Values Format */ + public static final String DATA_CODEC_NAME = "Composite99FormatData"; + + /** Meta codec name for Composite Doc Values Format */ + public static final String META_CODEC_NAME = "Composite99FormatMeta"; + + /** Filename extension for the composite index data */ + public static final String DATA_EXTENSION = "cid"; + + /** Filename extension for the composite index meta */ + public static final String META_EXTENSION = "cim"; + + /** Data doc values codec name for Composite Doc Values Format */ + public static final String DATA_DOC_VALUES_CODEC = "Composite99DocValuesData"; + + /** Meta doc values codec name for Composite Doc Values Format */ + public static final String META_DOC_VALUES_CODEC = "Composite99DocValuesMetadata"; + + /** Filename extension for the composite index data doc values */ + public static final String DATA_DOC_VALUES_EXTENSION = "cidvd"; + + /** Filename extension for the composite index meta doc values */ + public static final String META_DOC_VALUES_EXTENSION = "cidvm"; + + /** Initial version for the Composite90DocValuesFormat */ + public static final int VERSION_START = 0; + + /** Current version for the Composite90DocValuesFormat */ + public static final int VERSION_CURRENT = VERSION_START; + // needed for SPI public Composite99DocValuesFormat() { this(new Lucene90DocValuesFormat(), null); diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java index df5008a7f294e..bd4fbaca3ac05 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java @@ -8,19 +8,50 @@ package org.opensearch.index.codec.composite; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.lucene90.StarTree99DocValuesProducer; import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.IndexInput; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.compositeindex.CompositeIndexMetadata; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.MergeDimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; +import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTree; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; +import org.opensearch.index.compositeindex.datacube.startree.node.Tree; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; /** * Reader for star tree index and star tree doc values from the segments @@ -29,11 +60,123 @@ */ @ExperimentalApi public class Composite99DocValuesReader extends DocValuesProducer implements CompositeIndexReader { - private DocValuesProducer delegate; + private static final Logger logger = LogManager.getLogger(CompositeIndexMetadata.class); + + private final DocValuesProducer delegate; + private IndexInput dataIn; + private ChecksumIndexInput metaIn; + private final Map starTreeMap = new LinkedHashMap<>(); + private final Map compositeIndexMetadataMap = new LinkedHashMap<>(); + private final Map compositeDocValuesProducerMap = new LinkedHashMap<>(); + private final List compositeFieldInfos = new ArrayList<>(); + private final SegmentReadState readState; - public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState state) throws IOException { + public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState readState) throws IOException { this.delegate = producer; - // TODO : read star tree files + this.readState = readState; + + String metaFileName = IndexFileNames.segmentFileName( + readState.segmentInfo.name, + readState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + + String dataFileName = IndexFileNames.segmentFileName( + readState.segmentInfo.name, + readState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + + boolean success = false; + try { + + // initialize meta input + dataIn = readState.directory.openInput(dataFileName, readState.context); + CodecUtil.checkIndexHeader( + dataIn, + Composite99DocValuesFormat.DATA_CODEC_NAME, + Composite99DocValuesFormat.VERSION_START, + Composite99DocValuesFormat.VERSION_CURRENT, + readState.segmentInfo.getId(), + readState.segmentSuffix + ); + + // initialize data input + metaIn = readState.directory.openChecksumInput(metaFileName, readState.context); + Throwable priorE = null; + try { + CodecUtil.checkIndexHeader( + metaIn, + Composite99DocValuesFormat.META_CODEC_NAME, + Composite99DocValuesFormat.VERSION_START, + Composite99DocValuesFormat.VERSION_CURRENT, + readState.segmentInfo.getId(), + readState.segmentSuffix + ); + + while (true) { + + // validate magic marker + long magicMarker = metaIn.readLong(); + if (magicMarker == -1) { + logger.info("EOF reached for composite index metadata"); + break; + } else if (magicMarker < 0) { + throw new CorruptIndexException("Unknown token encountered: " + magicMarker, metaIn); + } else if (MAGIC_MARKER != magicMarker) { + logger.error("Invalid composite field magic marker"); + throw new IOException("Invalid composite field magic marker"); + } + + // construct composite index metadata + CompositeIndexMetadata compositeIndexMetadata = new CompositeIndexMetadata(metaIn); + String compositeFieldName = compositeIndexMetadata.getCompositeFieldName(); + compositeFieldInfos.add( + new CompositeIndexFieldInfo(compositeFieldName, compositeIndexMetadata.getCompositeFieldType()) + ); + + switch (compositeIndexMetadata.getCompositeFieldType()) { + case STAR_TREE: + StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata(); + Tree starTree = new StarTree(dataIn, starTreeMetadata); + starTreeMap.put(compositeFieldName, starTree); + compositeIndexMetadataMap.put(compositeFieldName, compositeIndexMetadata); + + List dimensionFieldNumbers = starTreeMetadata.getDimensionFieldNumbers(); + List dimensions = new ArrayList<>(); + for (Integer fieldNumber : dimensionFieldNumbers) { + dimensions.add(readState.fieldInfos.fieldInfo(fieldNumber)); + } + + // initialize star-tree doc values producer + StarTree99DocValuesProducer starTreeDocValuesProducer = new StarTree99DocValuesProducer( + readState, + Composite99DocValuesFormat.DATA_DOC_VALUES_CODEC, + Composite99DocValuesFormat.DATA_DOC_VALUES_EXTENSION, + Composite99DocValuesFormat.META_DOC_VALUES_CODEC, + Composite99DocValuesFormat.META_DOC_VALUES_EXTENSION, + dimensions, + starTreeMetadata.getMetricEntries(), + compositeFieldName + ); + compositeDocValuesProducerMap.put(compositeFieldName, starTreeDocValuesProducer); + + break; + default: + throw new CorruptIndexException("Invalid composite field type found in the file", dataIn); + } + } + } catch (Throwable t) { + priorE = t; + } finally { + CodecUtil.checkFooter(metaIn, priorE); + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); + } + } } @Override @@ -64,24 +207,129 @@ public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { @Override public void checkIntegrity() throws IOException { delegate.checkIntegrity(); - // Todo : check integrity of composite index related [star tree] files + CodecUtil.checksumEntireFile(dataIn); } @Override public void close() throws IOException { delegate.close(); - // Todo: close composite index related files [star tree] files + boolean success = false; + try { + IOUtils.close(metaIn, dataIn); + for (DocValuesProducer docValuesProducer : compositeDocValuesProducerMap.values()) { + IOUtils.close(docValuesProducer); + } + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(metaIn, dataIn); + } + starTreeMap.clear(); + compositeIndexMetadataMap.clear(); + compositeDocValuesProducerMap.clear(); + metaIn = null; + dataIn = null; + } } @Override public List getCompositeIndexFields() { - // todo : read from file formats and get the field names. - return new ArrayList<>(); + return compositeFieldInfos; } @Override public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException { - // TODO : read compositeIndexValues [starTreeValues] from star tree files - throw new UnsupportedOperationException(); + + switch (compositeIndexFieldInfo.getType()) { + case STAR_TREE: + // building star tree values + CompositeIndexMetadata compositeIndexMetadata = compositeIndexMetadataMap.get(compositeIndexFieldInfo.getField()); + StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata(); + + // build skip star node dimensions + Set skipStarNodeCreationInDimsFieldNumbers = starTreeMetadata.getSkipStarNodeCreationInDims(); + Set skipStarNodeCreationInDims = new HashSet<>(); + for (Integer fieldNumber : skipStarNodeCreationInDimsFieldNumbers) { + skipStarNodeCreationInDims.add(readState.fieldInfos.fieldInfo(fieldNumber).getName()); + } + + // build dimensions + List dimensionFieldNumbers = starTreeMetadata.getDimensionFieldNumbers(); + List dimensions = new ArrayList<>(); + List mergeDimensions = new ArrayList<>(); + for (Integer fieldNumber : dimensionFieldNumbers) { + dimensions.add(readState.fieldInfos.fieldInfo(fieldNumber).getName()); + mergeDimensions.add(new MergeDimension(readState.fieldInfos.fieldInfo(fieldNumber).name)); + } + + // build metrics + Map starTreeMetricMap = new ConcurrentHashMap<>(); + for (MetricEntry metricEntry : starTreeMetadata.getMetricEntries()) { + String metricName = metricEntry.getMetricName(); + + Metric metric = starTreeMetricMap.computeIfAbsent(metricName, field -> new Metric(field, new ArrayList<>())); + metric.getMetrics().add(metricEntry.getMetricStat()); + } + List starTreeMetrics = new ArrayList<>(starTreeMetricMap.values()); + + // star-tree field + StarTreeField starTreeField = new StarTreeField( + compositeIndexMetadata.getCompositeFieldName(), + mergeDimensions, + starTreeMetrics, + new StarTreeFieldConfiguration( + starTreeMetadata.getMaxLeafDocs(), + skipStarNodeCreationInDims, + starTreeMetadata.getStarTreeBuildMode() + ) + ); + + // star-tree root node + StarTreeNode rootNode = starTreeMap.get(compositeIndexFieldInfo.getField()).getRoot(); + + // get doc id set iterators for metrics and dimensions + StarTree99DocValuesProducer starTree99DocValuesProducer = (StarTree99DocValuesProducer) compositeDocValuesProducerMap.get( + compositeIndexMetadata.getCompositeFieldName() + ); + Map dimensionsDocIdSetIteratorMap = new LinkedHashMap<>(); + Map metricsDocIdSetIteratorMap = new LinkedHashMap<>(); + + // get doc id set iterators for dimensions + for (String dimension : dimensions) { + dimensionsDocIdSetIteratorMap.put( + dimension, + starTree99DocValuesProducer.getSortedNumeric( + StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues(starTreeField.getName(), dimension) + ) + ); + } + + // get doc id set iterators for metrics + for (MetricEntry metricEntry : starTreeMetadata.getMetricEntries()) { + String metricFullName = StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues( + starTreeField.getName(), + metricEntry.getMetricName(), + metricEntry.getMetricStat().getTypeName() + ); + metricsDocIdSetIteratorMap.put(metricFullName, starTree99DocValuesProducer.getSortedNumeric(metricFullName)); + } + + // create star-tree attributes map + Map starTreeAttributes = new HashMap<>(); + starTreeAttributes.put("segment_docs_count", String.valueOf(starTreeMetadata.getSegmentAggregatedDocCount())); + + // return star-tree values + return new StarTreeValues( + starTreeField, + rootNode, + dimensionsDocIdSetIteratorMap, + metricsDocIdSetIteratorMap, + starTreeAttributes + ); + + default: + throw new CorruptIndexException("Unsupported composite index field type: ", compositeIndexFieldInfo.getType().getName()); + } + } } diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java index ec97053bdff05..86eaf96c454af 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java @@ -10,20 +10,26 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.lucene90.Composite99DocValuesConsumer; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder; import org.opensearch.index.mapper.CompositeMappedFieldType; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.StarTreeMapper; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -45,11 +51,16 @@ public class Composite99DocValuesWriter extends DocValuesConsumer { AtomicReference mergeState = new AtomicReference<>(); private final Set compositeMappedFieldTypes; private final Set compositeFieldSet; + private DocValuesConsumer composite99DocValuesConsumer; + + public IndexOutput dataOut; + public IndexOutput metaOut; private final Map fieldProducerMap = new HashMap<>(); private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class); - public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) { + public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) + throws IOException { this.delegate = delegate; this.state = segmentWriteState; @@ -59,6 +70,51 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState for (CompositeMappedFieldType type : compositeMappedFieldTypes) { compositeFieldSet.addAll(type.fields()); } + + boolean success = false; + try { + this.composite99DocValuesConsumer = new Composite99DocValuesConsumer( + segmentWriteState, + Composite99DocValuesFormat.DATA_DOC_VALUES_CODEC, + Composite99DocValuesFormat.DATA_DOC_VALUES_EXTENSION, + Composite99DocValuesFormat.META_DOC_VALUES_CODEC, + Composite99DocValuesFormat.META_DOC_VALUES_EXTENSION + ); + + String dataFileName = IndexFileNames.segmentFileName( + segmentWriteState.segmentInfo.name, + segmentWriteState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + dataOut = segmentWriteState.directory.createOutput(dataFileName, segmentWriteState.context); + CodecUtil.writeIndexHeader( + dataOut, + Composite99DocValuesFormat.DATA_CODEC_NAME, + Composite99DocValuesFormat.VERSION_CURRENT, + segmentWriteState.segmentInfo.getId(), + segmentWriteState.segmentSuffix + ); + + String metaFileName = IndexFileNames.segmentFileName( + segmentWriteState.segmentInfo.name, + segmentWriteState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + metaOut = segmentWriteState.directory.createOutput(metaFileName, segmentWriteState.context); + CodecUtil.writeIndexHeader( + metaOut, + Composite99DocValuesFormat.META_CODEC_NAME, + Composite99DocValuesFormat.VERSION_CURRENT, + segmentWriteState.segmentInfo.getId(), + segmentWriteState.segmentSuffix + ); + + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); + } + } } @Override @@ -93,6 +149,26 @@ public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) @Override public void close() throws IOException { delegate.close(); + boolean success = false; + try { + if (metaOut != null) { + metaOut.writeLong(-1); // write EOF marker + CodecUtil.writeFooter(metaOut); // write checksum + } + if (dataOut != null) { + CodecUtil.writeFooter(dataOut); // write checksum + } + + success = true; + } finally { + if (success) { + IOUtils.close(dataOut, metaOut, composite99DocValuesConsumer); + } else { + IOUtils.closeWhileHandlingException(dataOut, metaOut, composite99DocValuesConsumer); + } + metaOut = dataOut = null; + composite99DocValuesConsumer = null; + } } private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, FieldInfo field) throws IOException { @@ -104,9 +180,9 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, // we have all the required fields to build composite fields if (compositeFieldSet.isEmpty()) { for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) { - if (mappedType.getCompositeIndexType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) { + if (mappedType instanceof StarTreeMapper.StarTreeFieldType) { try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService)) { - starTreesBuilder.build(fieldProducerMap); + starTreesBuilder.build(metaOut, dataOut, fieldProducerMap, composite99DocValuesConsumer); } } } @@ -122,6 +198,7 @@ public void merge(MergeState mergeState) throws IOException { /** * Merges composite fields from multiple segments + * * @param mergeState merge state */ private void mergeCompositeFields(MergeState mergeState) throws IOException { @@ -130,6 +207,7 @@ private void mergeCompositeFields(MergeState mergeState) throws IOException { /** * Merges star tree data fields from multiple segments + * * @param mergeState merge state */ private void mergeStarTreeFields(MergeState mergeState) throws IOException { @@ -151,8 +229,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) { CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo); if (compositeIndexValues instanceof StarTreeValues) { - List fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList()); - + List fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), new ArrayList<>()); if (!starTreeFieldMap.containsKey(fieldInfo.getField())) { starTreeFieldMap.put(fieldInfo.getField(), ((StarTreeValues) compositeIndexValues).getStarTreeField()); } @@ -168,7 +245,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { } } } - final StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); - starTreesBuilder.buildDuringMerge(starTreeFieldMap, starTreeSubsPerField); + StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); + starTreesBuilder.buildDuringMerge(metaOut, dataOut, starTreeFieldMap, starTreeSubsPerField, composite99DocValuesConsumer); } } diff --git a/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java b/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java index baed90273d311..8378a4063b7ca 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java @@ -27,17 +27,20 @@ public class StarTreeValues implements CompositeIndexValues { private final StarTreeNode root; private final Map dimensionDocValuesIteratorMap; private final Map metricDocValuesIteratorMap; + private final Map attributes; public StarTreeValues( StarTreeField starTreeField, StarTreeNode root, Map dimensionDocValuesIteratorMap, - Map metricDocValuesIteratorMap + Map metricDocValuesIteratorMap, + Map attributes ) { this.starTreeField = starTreeField; this.root = root; this.dimensionDocValuesIteratorMap = dimensionDocValuesIteratorMap; this.metricDocValuesIteratorMap = metricDocValuesIteratorMap; + this.attributes = attributes; } @Override @@ -60,4 +63,8 @@ public Map getDimensionDocValuesIteratorMap() { public Map getMetricDocValuesIteratorMap() { return metricDocValuesIteratorMap; } + + public Map getAttributes() { + return attributes; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexConstants.java b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexConstants.java new file mode 100644 index 0000000000000..defb4a26de260 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexConstants.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex; + +/** + * This class contains constants used in the Composite Index implementation. + */ +public class CompositeIndexConstants { + + /** + * The magic marker value used for sanity checks in the Composite Index implementation. + */ + public static final long MAGIC_MARKER = 0xC0950513F1E1DL; // Composite Field + + /** + * The version of the Composite Index implementation. + */ + public static final int VERSION = 1; + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexMetadata.java b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexMetadata.java new file mode 100644 index 0000000000000..8bb72708e1d52 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexMetadata.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata; +import org.opensearch.index.mapper.CompositeMappedFieldType; + +import java.io.IOException; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; + +/** + * This class represents the metadata of a Composite Index, which includes information about + * the composite field name, type, and the specific metadata for the type of composite field + * (e.g., Tree metadata). + * + * @opensearch.experimental + */ +public class CompositeIndexMetadata { + + private static final Logger logger = LogManager.getLogger(CompositeIndexMetadata.class); + private final String compositeFieldName; + private final CompositeMappedFieldType.CompositeFieldType compositeFieldType; + private final StarTreeMetadata starTreeMetadata; + + /** + * Constructs a CompositeIndexMetadata object from the provided IndexInput and magic marker. + * + * @param meta the IndexInput containing the metadata + * @throws IOException if an I/O error occurs while reading the metadata + */ + public CompositeIndexMetadata(IndexInput meta) throws IOException { + int version = meta.readInt(); + if (VERSION != version) { + logger.error("Invalid composite field version"); + throw new IOException("Invalid composite field version"); + } + + compositeFieldName = meta.readString(); + compositeFieldType = CompositeMappedFieldType.CompositeFieldType.fromName(meta.readString()); + + switch (compositeFieldType) { + // support for type of composite fields can be added in the future. + case STAR_TREE: + starTreeMetadata = new StarTreeMetadata(meta, compositeFieldName, compositeFieldType.getName()); + break; + default: + throw new CorruptIndexException("Invalid composite field type present in the file", meta); + } + + } + + /** + * Returns the star-tree metadata. + * + * @return the StarTreeMetadata + */ + public StarTreeMetadata getStarTreeMetadata() { + return starTreeMetadata; + } + + /** + * Returns the name of the composite field. + * + * @return the composite field name + */ + public String getCompositeFieldName() { + return compositeFieldName; + } + + /** + * Returns the type of the composite field. + * + * @return the composite field type + */ + public CompositeMappedFieldType.CompositeFieldType getCompositeFieldType() { + return compositeFieldType; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/MergeDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/MergeDimension.java new file mode 100644 index 0000000000000..1e15cae2e0029 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/MergeDimension.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube; + +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; + +import java.io.IOException; +import java.util.Objects; + +/** + * Composite index merge dimension class + * + * @opensearch.experimental + */ +public class MergeDimension implements Dimension { + public static final String MERGE = "merge"; + private final String field; + + public MergeDimension(String field) { + this.field = field; + } + + public String getField() { + return field; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(CompositeDataCubeFieldType.NAME, field); + builder.field(CompositeDataCubeFieldType.TYPE, MERGE); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MergeDimension dimension = (MergeDimension) o; + return Objects.equals(field, dimension.getField()); + } + + @Override + public int hashCode() { + return Objects.hash(field); + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java index bbf35448fbb1a..d7485b809f469 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java @@ -63,4 +63,9 @@ public Long toLongValue(Long value) { public Long toStarTreeNumericTypeValue(Long value) { return value; } + + @Override + public long getIdempotentMetricValue() { + return 0L; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java index e80d1c9097f9d..13fd64e024e24 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java @@ -65,11 +65,16 @@ public Long toLongValue(Double value) { } @Override - public Double toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) { + public Double toStarTreeNumericTypeValue(Long value) { try { - return type.getDoubleValue(value); + return VALUE_AGGREGATOR_TYPE.getDoubleValue(value); } catch (Exception e) { throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e); } } + + @Override + public long getIdempotentMetricValue() { + return Long.MIN_VALUE; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java index 3895b53fe7466..7520b73bc1e3b 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java @@ -79,7 +79,12 @@ public StarTreeNumericType getAggregatedValueType() { * @return field name with metric type and field */ public String toFieldName() { - return starFieldName + DELIMITER + field + DELIMITER + metricStat.getTypeName(); + return toFieldName(starFieldName, field, metricStat.getTypeName()); + + } + + public static String toFieldName(String starFieldName, String field, String metricName) { + return starFieldName + DELIMITER + field + DELIMITER + metricName; } @Override @@ -94,7 +99,7 @@ public boolean equals(Object obj) { } if (obj instanceof MetricAggregatorInfo) { MetricAggregatorInfo anotherPair = (MetricAggregatorInfo) obj; - return metricStat == anotherPair.metricStat && field.equals(anotherPair.field); + return metricStat.equals(anotherPair.metricStat) && field.equals(anotherPair.field); } return false; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricEntry.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricEntry.java new file mode 100644 index 0000000000000..683153ecde689 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricEntry.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.aggregators; + +import org.opensearch.index.compositeindex.datacube.MetricStat; + +/** + * Holds the pair of metric name and it's associated stat + * + * @opensearch.experimental + */ +public class MetricEntry { + + private final String metricName; + private final MetricStat metricStat; + + public MetricEntry(String metricName, MetricStat metricStat) { + this.metricName = metricName; + this.metricStat = metricStat; + } + + public String getMetricName() { + return metricName; + } + + public MetricStat getMetricStat() { + return metricStat; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java index 26acf44e778a7..0ca9e5d5b63e9 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java @@ -65,11 +65,16 @@ public Long toLongValue(Double value) { } @Override - public Double toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) { + public Double toStarTreeNumericTypeValue(Long value) { try { - return type.getDoubleValue(value); + return VALUE_AGGREGATOR_TYPE.getDoubleValue(value); } catch (Exception e) { throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e); } } + + @Override + public long getIdempotentMetricValue() { + return Long.MAX_VALUE; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java index 6b8c412bdd461..8ad0d1e6e5baf 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java @@ -94,4 +94,9 @@ public Double toStarTreeNumericTypeValue(Long value) { throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e); } } + + @Override + public long getIdempotentMetricValue() { + return 0; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java index 5a3c7451c4c6a..7e3f90e94290c 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java @@ -61,4 +61,9 @@ public interface ValueAggregator { * Converts an aggregated value from a Long type. */ A toStarTreeNumericTypeValue(Long rawValue); + + /** + * Fetches a value that does not alter the result of aggregations + */ + long getIdempotentMetricValue(); } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index 15bc8a624d01e..480c647e03afa 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -9,9 +9,21 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.EmptyDocValuesProducer; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedNumericDocValuesWriterHelper; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Counter; +import org.apache.lucene.util.NumericUtils; +import org.opensearch.common.time.DateUtils; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; @@ -22,6 +34,7 @@ import org.opensearch.index.compositeindex.datacube.startree.aggregators.ValueAggregator; import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType; import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils; import org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode; import org.opensearch.index.fielddata.IndexNumericFieldData; import org.opensearch.index.mapper.Mapper; @@ -29,16 +42,21 @@ import org.opensearch.index.mapper.NumberFieldMapper; import java.io.IOException; +import java.time.temporal.ChronoField; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode.ALL; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils.ALL; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues; /** * Builder for star tree. Defines the algorithm to construct star-tree @@ -68,18 +86,30 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder { private final StarTreeField starTreeField; private final MapperService mapperService; - private final SegmentWriteState state; + private final SegmentWriteState writeState; + + private final IndexOutput metaOut; + private final IndexOutput dataOut; /** * Reads all the configuration related to dimensions and metrics, builds a star-tree based on the different construction parameters. * - * @param starTreeField holds the configuration for the star tree - * @param state stores the segment write state - * @param mapperService helps to find the original type of the field + * @param starTreeField holds the configuration for the star tree + * @param writeState stores the segment write writeState + * @param mapperService helps to find the original type of the field */ - protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) { + protected BaseStarTreeBuilder( + IndexOutput metaOut, + IndexOutput dataOut, + StarTreeField starTreeField, + SegmentWriteState writeState, + MapperService mapperService + ) { logger.debug("Building star tree : {}", starTreeField.getName()); + this.metaOut = metaOut; + this.dataOut = dataOut; + this.starTreeField = starTreeField; StarTreeFieldConfiguration starTreeFieldSpec = starTreeField.getStarTreeConfig(); @@ -87,9 +117,9 @@ protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState sta this.numDimensions = dimensionsSplitOrder.size(); this.skipStarNodeCreationForDimensions = new HashSet<>(); - this.totalSegmentDocs = state.segmentInfo.maxDoc(); + this.totalSegmentDocs = writeState.segmentInfo.maxDoc(); this.mapperService = mapperService; - this.state = state; + this.writeState = writeState; Set skipStarNodeCreationForDimensions = starTreeFieldSpec.getSkipStarNodeCreationInDims(); @@ -134,6 +164,264 @@ public List generateMetricAggregatorInfos(MapperService ma return metricAggregatorInfos; } + /** + * Generates the configuration required to perform aggregation for all the metrics on a field + * + * @return list of MetricAggregatorInfo + */ + public List getMetricReaders(SegmentWriteState state, Map fieldProducerMap) + throws IOException { + + List metricReaders = new ArrayList<>(); + for (Metric metric : this.starTreeField.getMetrics()) { + for (MetricStat metricStat : metric.getMetrics()) { + SequentialDocValuesIterator metricReader; + FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); + if (metricStat != MetricStat.COUNT) { + metricReader = new SequentialDocValuesIterator( + fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo) + ); + } else { + metricReader = new SequentialDocValuesIterator(); + } + + metricReaders.add(metricReader); + } + } + return metricReaders; + } + + /** + * Builds the star tree from the original segment documents + * + * @param fieldProducerMap contain s the docValues producer to get docValues associated with each field + * @param fieldNumberAcrossStarTrees maintains a counter for the number of star-tree fields + * @param starTreeDocValuesConsumer consumes the generated star-tree docValues + * @throws IOException when we are unable to build star-tree + */ + public void build( + Map fieldProducerMap, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { + long startTime = System.currentTimeMillis(); + logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName()); + + List metricReaders = getMetricReaders(writeState, fieldProducerMap); + List dimensionsSplitOrder = starTreeField.getDimensionsOrder(); + SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()]; + for (int i = 0; i < numDimensions; i++) { + String dimension = dimensionsSplitOrder.get(i).getField(); + FieldInfo dimensionFieldInfo = writeState.fieldInfos.fieldInfo(dimension); + dimensionReaders[i] = new SequentialDocValuesIterator( + fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo) + ); + } + Iterator starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders); + logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); + build(starTreeDocumentIterator, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); + logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); + } + + /** + * Builds the star tree using sorted and aggregated star-tree Documents + * + * @param starTreeDocumentIterator contains the sorted and aggregated documents + * @param fieldNumberAcrossStarTrees maintains a counter for the number of star-tree fields + * @param starTreeDocValuesConsumer consumes the generated star-tree docValues + * @throws IOException when we are unable to build star-tree + */ + public void build( + Iterator starTreeDocumentIterator, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { + int numSegmentStarTreeDocument = totalSegmentDocs; + + while (starTreeDocumentIterator.hasNext()) { + appendToStarTree(starTreeDocumentIterator.next()); + } + int numStarTreeDocument = numStarTreeDocs; + logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument); + + if (numStarTreeDocs == 0) { + // serialize the star tree data + serializeStarTree(numSegmentStarTreeDocument); + return; + } + + constructStarTree(rootNode, 0, numStarTreeDocs); + int numStarTreeDocumentUnderStarNode = numStarTreeDocs - numStarTreeDocument; + logger.debug( + "Finished constructing star-tree, got [ {} ] tree nodes and [ {} ] starTreeDocument under star-node", + numStarTreeNodes, + numStarTreeDocumentUnderStarNode + ); + + createAggregatedDocs(rootNode); + int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode; + logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); + + // Create doc values indices in disk + createSortedDocValuesIndices(starTreeDocValuesConsumer, fieldNumberAcrossStarTrees); + + // serialize star-tree + serializeStarTree(numSegmentStarTreeDocument); + } + + private long getTimeStampVal(final String fieldName, final long val) { + long roundedDate = 0; + long ratio = 0; + + switch (fieldName) { + + case "@timestamp": + ratio = ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis(); + roundedDate = DateUtils.roundFloor(val, ratio); + return roundedDate; + case "hour": + ratio = ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis(); + roundedDate = DateUtils.roundFloor(val, ratio); + return roundedDate; + case "day": + ratio = ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis(); + roundedDate = DateUtils.roundFloor(val, ratio); + return roundedDate; + case "month": + roundedDate = DateUtils.roundMonthOfYear(val); + return roundedDate; + case "year": + roundedDate = DateUtils.roundYear(val); + return roundedDate; + default: + return val; + } + } + + private void serializeStarTree(int numSegmentStarTreeDocument) throws IOException { + // serialize the star tree data + long dataFilePointer = dataOut.getFilePointer(); + long totalStarTreeDataLength = StarTreeBuilderUtils.serializeStarTree(dataOut, rootNode, numStarTreeNodes); + + // serialize the star tree meta + StarTreeBuilderUtils.serializeStarTreeMetadata( + metaOut, + starTreeField, + writeState, + metricAggregatorInfos, + numSegmentStarTreeDocument, + dataFilePointer, + totalStarTreeDataLength + ); + } + + private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, AtomicInteger fieldNumberAcrossStarTrees) + throws IOException { + List dimensionWriters = new ArrayList<>(); + List metricWriters = new ArrayList<>(); + FieldInfo[] dimensionFieldInfoList = new FieldInfo[starTreeField.getDimensionsOrder().size()]; + FieldInfo[] metricFieldInfoList = new FieldInfo[metricAggregatorInfos.size()]; + + for (int i = 0; i < dimensionFieldInfoList.length; i++) { + final FieldInfo fi = new FieldInfo( + fullFieldNameForStarTreeDimensionsDocValues(starTreeField.getName(), starTreeField.getDimensionsOrder().get(i).getField()), + fieldNumberAcrossStarTrees.getAndIncrement(), + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + dimensionFieldInfoList[i] = fi; + dimensionWriters.add(new SortedNumericDocValuesWriterHelper(fi, Counter.newCounter())); + } + for (int i = 0; i < metricAggregatorInfos.size(); i++) { + FieldInfo fi = new FieldInfo( + fullFieldNameForStarTreeMetricsDocValues( + starTreeField.getName(), + metricAggregatorInfos.get(i).getField(), + metricAggregatorInfos.get(i).getMetricStat().getTypeName() + ), + fieldNumberAcrossStarTrees.getAndIncrement(), + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + metricFieldInfoList[i] = fi; + metricWriters.add(new SortedNumericDocValuesWriterHelper(fi, Counter.newCounter())); + } + + for (int docId = 0; docId < numStarTreeDocs; docId++) { + StarTreeDocument starTreeDocument = getStarTreeDocumentForCreatingDocValues(docId); + for (int i = 0; i < starTreeDocument.dimensions.length; i++) { + Long val = starTreeDocument.dimensions[i]; + if (val != null) { + dimensionWriters.get(i).addValue(docId, val); + } + } + + for (int i = 0; i < starTreeDocument.metrics.length; i++) { + try { + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + metricWriters.get(i).addValue(docId, (Long) starTreeDocument.metrics[i]); + break; + case DOUBLE: + metricWriters.get(i).addValue(docId, NumericUtils.doubleToSortableLong((Double) starTreeDocument.metrics[i])); + break; + default: + throw new IllegalStateException("Unknown metric doc value type"); + } + } catch (IllegalArgumentException e) { + logger.info("could not parse the value, exiting creation of star tree"); + } + } + } + + addStarTreeDocValueFields(docValuesConsumer, dimensionWriters, dimensionFieldInfoList, starTreeField.getDimensionsOrder().size()); + addStarTreeDocValueFields(docValuesConsumer, metricWriters, metricFieldInfoList, metricAggregatorInfos.size()); + } + + private void addStarTreeDocValueFields( + DocValuesConsumer docValuesConsumer, + List docValuesWriters, + FieldInfo[] fieldInfoList, + int fieldCount + ) throws IOException { + for (int i = 0; i < fieldCount; i++) { + final int increment = i; + DocValuesProducer docValuesProducer = new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) { + return docValuesWriters.get(increment).getDocValues(); + } + }; + docValuesConsumer.addSortedNumericField(fieldInfoList[i], docValuesProducer); + } + } + /** * Adds a document to the star-tree. * @@ -151,6 +439,15 @@ public List generateMetricAggregatorInfos(MapperService ma */ public abstract StarTreeDocument getStarTreeDocument(int docId) throws IOException; + /** + * Returns the star-tree document for the given doc id while creating doc values + * + * @param docId document id + * @return star tree document + * @throws IOException if an I/O error occurs while fetching the star-tree document + */ + public abstract StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException; + /** * Retrieves the list of star-tree documents in the star-tree. * @@ -172,7 +469,7 @@ public List generateMetricAggregatorInfos(MapperService ma * aggregated star-tree documents. * * @param dimensionReaders List of docValues readers to read dimensions from the segment - * @param metricReaders List of docValues readers to read metrics from the segment + * @param metricReaders List of docValues readers to read metrics from the segment * @return Iterator for the aggregated star-tree document */ public abstract Iterator sortAndAggregateSegmentDocuments( @@ -193,7 +490,6 @@ public abstract Iterator generateStarTreeDocumentsForStarNode( /** * Returns the star-tree document from the segment based on the current doc id - * */ protected StarTreeDocument getSegmentStarTreeDocument( int currentDocId, @@ -285,7 +581,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments( metrics[i] = metricValueAggregator.getInitialAggregatedValue(segmentDocument.metrics[i]); } else { metrics[i] = metricValueAggregator.getInitialAggregatedValueForSegmentDocValue( - getLong(segmentDocument.metrics[i]), + getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()), starTreeNumericType ); } @@ -309,7 +605,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments( } else { aggregatedSegmentDocument.metrics[i] = metricValueAggregator.mergeAggregatedValueAndSegmentValue( aggregatedSegmentDocument.metrics[i], - getLong(segmentDocument.metrics[i]), + getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()), starTreeNumericType ); } @@ -325,25 +621,26 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments( /** * Safely converts the metric value of object type to long. * - * @param metric value of the metric + * @param metric value of the metric + * @param idempotentMetricValue * @return converted metric value to long */ - private static long getLong(Object metric) { + private static long getLong(Object metric, long idempotentMetricValue) { - Long metricValue = null; + long metricValue; try { if (metric instanceof Long) { metricValue = (long) metric; } else if (metric != null) { - metricValue = Long.valueOf(String.valueOf(metric)); + metricValue = Long.parseLong(String.valueOf(metric)); + } else { + logger.debug("metric value is null, returning idempotent metric value for the aggregator"); + return idempotentMetricValue; } } catch (Exception e) { throw new IllegalStateException("unable to cast segment metric", e); } - if (metricValue == null) { - throw new IllegalStateException("unable to cast segment metric"); - } return metricValue; } @@ -387,101 +684,6 @@ public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocum } } - /** - * Builds the star tree from the original segment documents - * - * @param fieldProducerMap contain s the docValues producer to get docValues associated with each field - * - * @throws IOException when we are unable to build star-tree - */ - public void build(Map fieldProducerMap) throws IOException { - long startTime = System.currentTimeMillis(); - logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName()); - if (totalSegmentDocs == 0) { - logger.debug("No documents found in the segment"); - return; - } - List metricReaders = getMetricReaders(state, fieldProducerMap); - List dimensionsSplitOrder = starTreeField.getDimensionsOrder(); - SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()]; - for (int i = 0; i < numDimensions; i++) { - String dimension = dimensionsSplitOrder.get(i).getField(); - FieldInfo dimensionFieldInfo = state.fieldInfos.fieldInfo(dimension); - dimensionReaders[i] = new SequentialDocValuesIterator( - fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo) - ); - } - Iterator starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders); - logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); - build(starTreeDocumentIterator); - logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); - } - - /** - * Generates the configuration required to perform aggregation for all the metrics on a field - * - * @return list of MetricAggregatorInfo - */ - public List getMetricReaders(SegmentWriteState state, Map fieldProducerMap) - throws IOException { - List metricReaders = new ArrayList<>(); - for (Metric metric : this.starTreeField.getMetrics()) { - for (MetricStat metricType : metric.getMetrics()) { - FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); - // TODO - // if (metricType != MetricStat.COUNT) { - // Need not initialize the metric reader for COUNT metric type - SequentialDocValuesIterator metricReader = new SequentialDocValuesIterator( - fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo) - ); - // } - - metricReaders.add(metricReader); - } - } - return metricReaders; - } - - /** - * Builds the star tree using Star-Tree Document - * - * @param starTreeDocumentIterator contains the sorted and aggregated documents - * @throws IOException when we are unable to build star-tree - */ - void build(Iterator starTreeDocumentIterator) throws IOException { - int numSegmentStarTreeDocument = totalSegmentDocs; - - while (starTreeDocumentIterator.hasNext()) { - appendToStarTree(starTreeDocumentIterator.next()); - } - int numStarTreeDocument = numStarTreeDocs; - logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument); - - if (numStarTreeDocs == 0) { - // TODO: Uncomment when segment codec and file formats is ready - // StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes); - return; - } - - constructStarTree(rootNode, 0, numStarTreeDocs); - int numStarTreeDocumentUnderStarNode = numStarTreeDocs - numStarTreeDocument; - logger.debug( - "Finished constructing star-tree, got [ {} ] tree nodes and [ {} ] starTreeDocument under star-node", - numStarTreeNodes, - numStarTreeDocumentUnderStarNode - ); - - createAggregatedDocs(rootNode); - int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode; - logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); - - // TODO: When StarTree Codec is ready - // Create doc values indices in disk - // Serialize and save in disk - // Write star tree metadata for off heap implementation - - } - /** * Adds a document to star-tree * diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index f5297de1a5e2b..35a03227114c3 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -7,8 +7,10 @@ */ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; @@ -24,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; /** * On heap single tree builder @@ -42,8 +45,14 @@ public class OnHeapStarTreeBuilder extends BaseStarTreeBuilder { * @param segmentWriteState segment write state * @param mapperService helps with the numeric type of field */ - public OnHeapStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState segmentWriteState, MapperService mapperService) { - super(starTreeField, segmentWriteState, mapperService); + public OnHeapStarTreeBuilder( + IndexOutput metaOut, + IndexOutput dataOut, + StarTreeField starTreeField, + SegmentWriteState segmentWriteState, + MapperService mapperService + ) throws IOException { + super(metaOut, dataOut, starTreeField, segmentWriteState, mapperService); } @Override @@ -83,12 +92,16 @@ public Iterator sortAndAggregateSegmentDocuments( for (int currentDocId = 0; currentDocId < totalSegmentDocs; currentDocId++) { starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId, dimensionReaders, metricReaders); } - return sortAndAggregateStarTreeDocuments(starTreeDocuments); + return sortAndAggregateStarTreeDocuments(starTreeDocuments, false); } @Override - public void build(List starTreeValuesSubs) throws IOException { - build(mergeStarTrees(starTreeValuesSubs)); + public void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { + build(mergeStarTrees(starTreeValuesSubs), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } /** @@ -166,8 +179,9 @@ StarTreeDocument[] mergeStarTreeValues(List starTreeValuesSubs) return starTreeDocuments.toArray(starTreeDocumentsArr); } - Iterator sortAndAggregateStarTreeDocuments(StarTreeDocument[] starTreeDocuments) { - return sortAndAggregateStarTreeDocuments(starTreeDocuments, false); + @Override + public StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException { + return starTreeDocuments.get(docId); } /** diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java index 94c9c9f2efb18..51bdd4d64b9e3 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; @@ -16,6 +17,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * A star-tree builder that builds a single star-tree. @@ -28,16 +30,28 @@ public interface StarTreeBuilder extends Closeable { * Builds the star tree from the original segment documents * * @param fieldProducerMap contains the docValues producer to get docValues associated with each field + * @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree + * @param starTreeDocValuesConsumer consumer of star-tree doc values * @throws IOException when we are unable to build star-tree */ - void build(Map fieldProducerMap) throws IOException; + void build( + Map fieldProducerMap, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException; /** - * Builds the star tree using StarTree values from multiple segments + * Builds the star tree using Tree values from multiple segments * * @param starTreeValuesSubs contains the star tree values from multiple segments + * @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree + * @param starTreeDocValuesConsumer consumer of star-tree doc values * @throws IOException when we are unable to build star-tree */ - void build(List starTreeValuesSubs) throws IOException; + void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java index 3cf8c665028a5..a92384c250dfd 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java @@ -10,8 +10,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; @@ -25,6 +27,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * Builder to construct star-trees based on multiple star-tree fields. @@ -39,6 +42,7 @@ public class StarTreesBuilder implements Closeable { private final List starTreeFields; private final SegmentWriteState state; private final MapperService mapperService; + private AtomicInteger fieldNumberAcrossStarTrees; public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mapperService) { List starTreeFields = new ArrayList<>(); @@ -58,12 +62,24 @@ public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mappe this.starTreeFields = starTreeFields; this.state = segmentWriteState; this.mapperService = mapperService; + this.fieldNumberAcrossStarTrees = new AtomicInteger(); } /** - * Builds the star-trees. + * Builds all star-trees for given star-tree fields. + * + * @param metaOut an IndexInput for star-tree metadata + * @param dataOut an IndexInput for star-tree data + * @param fieldProducerMap fetches iterators for the fields (dimensions and metrics) + * @param starTreeDocValuesConsumer a consumer to write star-tree doc values + * @throws IOException */ - public void build(Map fieldProducerMap) throws IOException { + public void build( + IndexOutput metaOut, + IndexOutput dataOut, + Map fieldProducerMap, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { if (starTreeFields.isEmpty()) { logger.debug("no star-tree fields found, returning from star-tree builder"); return; @@ -76,8 +92,8 @@ public void build(Map fieldProducerMap) throws IOExce // Build all star-trees for (int i = 0; i < numStarTrees; i++) { StarTreeField starTreeField = starTreeFields.get(i); - try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(starTreeField, state, mapperService)) { - starTreeBuilder.build(fieldProducerMap); + try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService)) { + starTreeBuilder.build(fieldProducerMap, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } } logger.debug("Took {} ms to build {} star-trees with star-tree fields", System.currentTimeMillis() - startTime, numStarTrees); @@ -91,18 +107,25 @@ public void close() throws IOException { /** * Merges star tree fields from multiple segments * + * @param metaOut an IndexInput for star-tree metadata + * @param dataOut an IndexInput for star-tree data * @param starTreeFieldMap StarTreeField configuration per field * @param starTreeValuesSubsPerField starTreeValuesSubs per field + * @param starTreeDocValuesConsumer a consumer to write star-tree doc values */ public void buildDuringMerge( + IndexOutput metaOut, + IndexOutput dataOut, final Map starTreeFieldMap, - final Map> starTreeValuesSubsPerField + final Map> starTreeValuesSubsPerField, + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException { for (Map.Entry> entry : starTreeValuesSubsPerField.entrySet()) { List starTreeValuesList = entry.getValue(); StarTreeField starTreeField = starTreeFieldMap.get(entry.getKey()); - StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService); - builder.build(starTreeValuesList); + StarTreeBuilder builder = getSingleTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService); + builder.build(starTreeValuesList, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); + builder.close(); builder.close(); } } @@ -110,11 +133,16 @@ public void buildDuringMerge( /** * Get star-tree builder based on build mode. */ - StarTreeBuilder getSingleTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) - throws IOException { + StarTreeBuilder getSingleTreeBuilder( + IndexOutput metaOut, + IndexOutput dataOut, + StarTreeField starTreeField, + SegmentWriteState state, + MapperService mapperService + ) throws IOException { switch (starTreeField.getStarTreeConfig().getBuildMode()) { case ON_HEAP: - return new OnHeapStarTreeBuilder(starTreeField, state, mapperService); + return new OnHeapStarTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService); case OFF_HEAP: // TODO // return new OffHeapStarTreeBuilder(starTreeField, state, mapperService); diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/StarTreeMetadata.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/StarTreeMetadata.java new file mode 100644 index 0000000000000..e328cb061e3f1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/StarTreeMetadata.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.meta; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Holds the associated metadata for the building of star-tree + * + * @opensearch.experimental + */ +public class StarTreeMetadata implements TreeMetadata { + private static final Logger logger = LogManager.getLogger(TreeMetadata.class); + private final IndexInput meta; + private final String starTreeFieldName; + private final String starTreeFieldType; + private final List dimensionFieldNumbers; + private final List metricEntries; + private final Integer segmentAggregatedDocCount; + private final Integer maxLeafDocs; + private final Set skipStarNodeCreationInDims; + private final StarTreeFieldConfiguration.StarTreeBuildMode starTreeBuildMode; + private final long dataStartFilePointer; + private final long dataLength; + + public StarTreeMetadata(IndexInput meta, String compositeFieldName, String compositeFieldType) throws IOException { + this.meta = meta; + try { + this.starTreeFieldName = compositeFieldName; + this.starTreeFieldType = compositeFieldType; + this.dimensionFieldNumbers = readStarTreeDimensions(); + this.metricEntries = readMetricEntries(); + this.segmentAggregatedDocCount = readSegmentAggregatedDocCount(); + this.maxLeafDocs = readMaxLeafDocs(); + this.skipStarNodeCreationInDims = readSkipStarNodeCreationInDims(); + this.starTreeBuildMode = readBuildMode(); + this.dataStartFilePointer = readDataStartFilePointer(); + this.dataLength = readDataLength(); + } catch (Exception e) { + logger.error("Unable to read star-tree metadata from the file"); + throw new CorruptIndexException("Unable to read star-tree metadata from the file", meta); + } + } + + @Override + public int readDimensionsCount() throws IOException { + return meta.readInt(); + } + + @Override + public List readStarTreeDimensions() throws IOException { + int dimensionCount = readDimensionsCount(); + List dimensionFieldNumbers = new ArrayList<>(); + + for (int i = 0; i < dimensionCount; i++) { + dimensionFieldNumbers.add(meta.readInt()); + } + + return dimensionFieldNumbers; + } + + @Override + public int readMetricsCount() throws IOException { + return meta.readInt(); + } + + @Override + public List readMetricEntries() throws IOException { + int metricCount = readMetricsCount(); + List metricEntries = new ArrayList<>(); + + for (int i = 0; i < metricCount; i++) { + String metricName = meta.readString(); + String metricStat = meta.readString(); + metricEntries.add(new MetricEntry(metricName, MetricStat.fromTypeName(metricStat))); + } + + return metricEntries; + } + + @Override + public int readSegmentAggregatedDocCount() throws IOException { + return meta.readInt(); + } + + @Override + public int readMaxLeafDocs() throws IOException { + return meta.readInt(); + } + + @Override + public int readSkipStarNodeCreationInDimsCount() throws IOException { + return meta.readInt(); + } + + @Override + public Set readSkipStarNodeCreationInDims() throws IOException { + + int skipStarNodeCreationInDimsCount = readSkipStarNodeCreationInDimsCount(); + Set skipStarNodeCreationInDims = new HashSet<>(); + for (int i = 0; i < skipStarNodeCreationInDimsCount; i++) { + skipStarNodeCreationInDims.add(meta.readInt()); + } + return skipStarNodeCreationInDims; + } + + @Override + public StarTreeFieldConfiguration.StarTreeBuildMode readBuildMode() throws IOException { + return StarTreeFieldConfiguration.StarTreeBuildMode.fromTypeName(meta.readString()); + } + + @Override + public long readDataStartFilePointer() throws IOException { + return meta.readLong(); + } + + @Override + public long readDataLength() throws IOException { + return meta.readLong(); + } + + /** + * Returns the name of the star-tree field. + * + * @return star-tree field name + */ + public String getStarTreeFieldName() { + return starTreeFieldName; + } + + /** + * Returns the type of the star tree field. + * + * @return star-tree field type + */ + public String getStarTreeFieldType() { + return starTreeFieldType; + } + + /** + * Returns the list of dimension field numbers. + * + * @return star-tree dimension field numbers + */ + public List getDimensionFieldNumbers() { + return dimensionFieldNumbers; + } + + /** + * Returns the list of metric entries. + * + * @return star-tree metric entries + */ + public List getMetricEntries() { + return metricEntries; + } + + /** + * Returns the aggregated document count for the star-tree. + * + * @return the aggregated document count for the star-tree. + */ + public Integer getSegmentAggregatedDocCount() { + return segmentAggregatedDocCount; + } + + /** + * Returns the max leaf docs for the star-tree. + * + * @return the max leaf docs. + */ + public Integer getMaxLeafDocs() { + return maxLeafDocs; + } + + /** + * Returns the set of dimensions for which star node will not be created in the star-tree. + * + * @return the set of dimensions. + */ + public Set getSkipStarNodeCreationInDims() { + return skipStarNodeCreationInDims; + } + + /** + * Returns the build mode for the star-tree. + * + * @return the star-tree build mode. + */ + public StarTreeFieldConfiguration.StarTreeBuildMode getStarTreeBuildMode() { + return starTreeBuildMode; + } + + /** + * Returns the file pointer to the start of the star-tree data. + * + * @return start file pointer for star-tree data + */ + public long getDataStartFilePointer() { + return dataStartFilePointer; + } + + /** + * Returns the length of star-tree data + * + * @return star-tree length + */ + public long getDataLength() { + return dataLength; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/TreeMetadata.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/TreeMetadata.java new file mode 100644 index 0000000000000..9859afad95a74 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/TreeMetadata.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.meta; + +import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +/** + * An interface for metadata of the star-tree + * + * @opensearch.experimental + */ +public interface TreeMetadata { + + /** + * Reads the count of dimensions in the star-tree. + * + * @return the count of dimensions + * @throws IOException if an I/O error occurs while reading the dimensions count + */ + int readDimensionsCount() throws IOException; + + /** + * Reads the list of dimension ordinals in the star-tree. + * + * @return the list of dimension ordinals + * @throws IOException if an I/O error occurs while reading the dimension ordinals + */ + List readStarTreeDimensions() throws IOException; + + /** + * Reads the count of metrics in the star-tree. + * + * @return the count of metrics + * @throws IOException if an I/O error occurs while reading the metrics count + */ + int readMetricsCount() throws IOException; + + /** + * Reads the list of metric entries in the star-tree. + * + * @return the list of metric entries + * @throws IOException if an I/O error occurs while reading the metric entries + */ + List readMetricEntries() throws IOException; + + /** + * Reads the aggregated document count for the segment in the star-tree. + * + * @return the aggregated document count for the segment + * @throws IOException if an I/O error occurs while reading the aggregated document count + */ + int readSegmentAggregatedDocCount() throws IOException; + + /** + * Reads the max leaf docs for the star-tree. + * + * @return the max leaf docs for the star-tree + * @throws IOException if an I/O error occurs while reading the max leaf docs + */ + int readMaxLeafDocs() throws IOException; + + /** + * Reads the count of dimensions where star node will not be created in the star-tree. + * + * @return the count of dimensions + * @throws IOException if an I/O error occurs while reading the skip star node dimensions count + */ + int readSkipStarNodeCreationInDimsCount() throws IOException; + + /** + * Reads the list of dimensions field numbers to be skipped for star node creation in the star-tree. + * + * @return the set of dimensions field numbers to be skipped for star node creation. + * @throws IOException if an I/O error occurs while reading the dimensions + */ + Set readSkipStarNodeCreationInDims() throws IOException; + + /** + * Reads the build mode for the star-tree. + * + * @return the star-tree build mode + * @throws IOException if an I/O error occurs while reading the build mode + */ + StarTreeFieldConfiguration.StarTreeBuildMode readBuildMode() throws IOException; + + /** + * Reads the file pointer to the start of the star-tree data. + * + * @return the file pointer to the start of the star-tree data + * @throws IOException if an I/O error occurs while reading the star-tree data start file pointer + */ + long readDataStartFilePointer() throws IOException; + + /** + * Reads the length of the data of the star-tree. + * + * @return the length of the data + * @throws IOException if an I/O error occurs while reading the data length + */ + long readDataLength() throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/package-info.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/package-info.java new file mode 100644 index 0000000000000..568cacea4b59a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Meta package for star tree + * + * @opensearch.experimental + */ +package org.opensearch.index.compositeindex.datacube.startree.meta; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/FixedLengthStarTreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/FixedLengthStarTreeNode.java new file mode 100644 index 0000000000000..b4adae7a9380a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/FixedLengthStarTreeNode.java @@ -0,0 +1,186 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.node; + +import org.apache.lucene.store.RandomAccessInput; + +import java.io.IOException; +import java.util.Iterator; + +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils.ALL; + +/** + * Fixed Length implementation of {@link StarTreeNode} + * + * @opensearch.experimental + */ +public class FixedLengthStarTreeNode implements StarTreeNode { + public static final int NUM_INT_SERIALIZABLE_FIELDS = 6; + public static final int NUM_LONG_SERIALIZABLE_FIELDS = 1; + public static final int NUM_BYTE_SERIALIZABLE_FIELDS = 1; + public static final long SERIALIZABLE_DATA_SIZE_IN_BYTES = (Integer.BYTES * NUM_INT_SERIALIZABLE_FIELDS) + (Long.BYTES + * NUM_LONG_SERIALIZABLE_FIELDS) + (NUM_BYTE_SERIALIZABLE_FIELDS * Byte.BYTES); + private static final int DIMENSION_ID_OFFSET = 0; + private static final int DIMENSION_VALUE_OFFSET = DIMENSION_ID_OFFSET + Integer.BYTES; + private static final int START_DOC_ID_OFFSET = DIMENSION_VALUE_OFFSET + Long.BYTES; + private static final int END_DOC_ID_OFFSET = START_DOC_ID_OFFSET + Integer.BYTES; + private static final int AGGREGATE_DOC_ID_OFFSET = END_DOC_ID_OFFSET + Integer.BYTES; + private static final int IS_STAR_NODE_OFFSET = AGGREGATE_DOC_ID_OFFSET + Byte.BYTES; + private static final int FIRST_CHILD_ID_OFFSET = IS_STAR_NODE_OFFSET + Integer.BYTES; + private static final int LAST_CHILD_ID_OFFSET = FIRST_CHILD_ID_OFFSET + Integer.BYTES; + + public static final int INVALID_ID = -1; + + private final int nodeId; + private final int firstChildId; + + RandomAccessInput in; + + public FixedLengthStarTreeNode(RandomAccessInput in, int nodeId) throws IOException { + this.in = in; + this.nodeId = nodeId; + firstChildId = getInt(FIRST_CHILD_ID_OFFSET); + } + + private int getInt(int fieldOffset) throws IOException { + return in.readInt(nodeId * SERIALIZABLE_DATA_SIZE_IN_BYTES + fieldOffset); + } + + private long getLong(int fieldOffset) throws IOException { + return in.readLong(nodeId * SERIALIZABLE_DATA_SIZE_IN_BYTES + fieldOffset); + } + + private byte getByte(int fieldOffset) throws IOException { + return in.readByte(nodeId * SERIALIZABLE_DATA_SIZE_IN_BYTES + fieldOffset); + } + + @Override + public int getDimensionId() throws IOException { + return getInt(DIMENSION_ID_OFFSET); + } + + @Override + public long getDimensionValue() throws IOException { + return getLong(DIMENSION_VALUE_OFFSET); + } + + @Override + public int getChildDimensionId() throws IOException { + if (firstChildId == INVALID_ID) { + return INVALID_ID; + } else { + return in.readInt(firstChildId * SERIALIZABLE_DATA_SIZE_IN_BYTES); + } + } + + @Override + public int getStartDocId() throws IOException { + return getInt(START_DOC_ID_OFFSET); + } + + @Override + public int getEndDocId() throws IOException { + return getInt(END_DOC_ID_OFFSET); + } + + @Override + public int getAggregatedDocId() throws IOException { + return getInt(AGGREGATE_DOC_ID_OFFSET); + } + + @Override + public int getNumChildren() throws IOException { + if (firstChildId == INVALID_ID) { + return 0; + } else { + return getInt(LAST_CHILD_ID_OFFSET) - firstChildId + 1; + } + } + + @Override + public boolean isLeaf() { + return firstChildId == INVALID_ID; + } + + @Override + public boolean isStarNode() throws IOException { + return getByte(IS_STAR_NODE_OFFSET) != 0; + } + + @Override + public StarTreeNode getChildForDimensionValue(long dimensionValue) throws IOException { + // there will be no children for leaf nodes + if (isLeaf()) { + return null; + } + + // Specialize star node for performance + if (dimensionValue == ALL) { + return handleStarNode(); + } + + return binarySearchChild(dimensionValue); + } + + private FixedLengthStarTreeNode handleStarNode() throws IOException { + FixedLengthStarTreeNode firstNode = new FixedLengthStarTreeNode(in, firstChildId); + if (firstNode.getDimensionValue() == ALL) { + return firstNode; + } else { + return null; + } + } + + private FixedLengthStarTreeNode binarySearchChild(long dimensionValue) throws IOException { + // Binary search to find child node + int low = firstChildId; + int high = getInt(LAST_CHILD_ID_OFFSET); + + while (low <= high) { + int mid = low + (high - low) / 2; + FixedLengthStarTreeNode midNode = new FixedLengthStarTreeNode(in, mid); + long midNodeDimensionValue = midNode.getDimensionValue(); + + if (midNodeDimensionValue == dimensionValue) { + return midNode; + } else if (midNodeDimensionValue < dimensionValue) { + low = mid + 1; + } else { + high = mid - 1; + } + } + return null; + } + + @Override + public Iterator getChildrenIterator() throws IOException { + return new Iterator<>() { + private int currentChildId = firstChildId; + private final int lastChildId = getInt(LAST_CHILD_ID_OFFSET); + + @Override + public boolean hasNext() { + return currentChildId <= lastChildId; + } + + @Override + public FixedLengthStarTreeNode next() { + try { + return new FixedLengthStarTreeNode(in, currentChildId++); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java new file mode 100644 index 0000000000000..134c75b7d19b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.node; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata; + +import java.io.IOException; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; + +/** + * Off heap implementation of the star-tree. + * + * @opensearch.experimental + */ +public class StarTree implements Tree { + private static final Logger logger = LogManager.getLogger(StarTree.class); + private final FixedLengthStarTreeNode root; + private final Integer numNodes; + + public StarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throws IOException { + long magicMarker = data.readLong(); + if (MAGIC_MARKER != magicMarker) { + logger.error("Invalid magic marker"); + throw new IOException("Invalid magic marker"); + } + int version = data.readInt(); + if (VERSION != version) { + logger.error("Invalid star tree version"); + throw new IOException("Invalid version"); + } + numNodes = data.readInt(); // num nodes + + RandomAccessInput in = data.randomAccessSlice(data.getFilePointer(), starTreeMetadata.getDataLength()); + root = new FixedLengthStarTreeNode(in, 0); + } + + @Override + public StarTreeNode getRoot() { + return root; + } + + /** + * Returns the number of nodes in star-tree + * + * @return number of nodes in te star-tree + */ + public Integer getNumNodes() { + return numNodes; + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java index 59522ffa4be89..cb58d53534396 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java @@ -20,7 +20,6 @@ */ @ExperimentalApi public interface StarTreeNode { - long ALL = -1l; /** * Returns the dimension ID of the current star-tree node. diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/Tree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/Tree.java new file mode 100644 index 0000000000000..811150da64bec --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/Tree.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.node; + +/** + * Interface for star-tree. + * + * @opensearch.experimental + */ +public interface Tree { + + /** + * Fetches the root node of the star-tree. + * @return the root of the star-tree + */ + StarTreeNode getRoot(); + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java index 516d5b5a012ab..19d12bc6318d7 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java @@ -8,5 +8,7 @@ /** * Holds classes associated with star tree node + * + * @opensearch.experimental */ package org.opensearch.index.compositeindex.datacube.startree.node; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java index 5e030233c5e8b..b3360571c9dfc 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java @@ -47,6 +47,49 @@ public SequentialDocValuesIterator(DocIdSetIterator docIdSetIterator) { this.docIdSetIterator = docIdSetIterator; } + /** + * Constructs a new SequentialDocValuesIterator instance with the given SortedNumericDocValues. + * + */ + public SequentialDocValuesIterator() { + this.docIdSetIterator = new SortedNumericDocValues() { + @Override + public long nextValue() throws IOException { + return 0; + } + + @Override + public int docValueCount() { + return 0; + } + + @Override + public boolean advanceExact(int i) throws IOException { + return false; + } + + @Override + public int docID() { + return 0; + } + + @Override + public int nextDoc() throws IOException { + return 0; + } + + @Override + public int advance(int i) throws IOException { + return 0; + } + + @Override + public long cost() { + return 0; + } + }; + } + /** * Returns the value associated with the latest document. * diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeBuilderUtils.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeBuilderUtils.java new file mode 100644 index 0000000000000..ac497f9cd3dc9 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeBuilderUtils.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; +import org.opensearch.index.mapper.CompositeMappedFieldType; + +import java.io.IOException; +import java.util.List; + +/** + * Util class for building star tree + * + * @opensearch.experimental + */ +public class StarTreeBuilderUtils { + + private StarTreeBuilderUtils() {} + + public static final int ALL = -1; + + /** + * Serialize star tree to index output stream + * + * @param dataOut data index output + * @param rootNode root star-tree node + * @param numNodes number of nodes in the tree + * @return total size of the three + * @throws IOException when star-tree data serialization fails + */ + public static long serializeStarTree(IndexOutput dataOut, TreeNode rootNode, int numNodes) throws IOException { + return StarTreeDataWriter.serializeStarTree(dataOut, rootNode, numNodes); + } + + /** + * Serialize star tree metadata to index output stream + * + * @param metaOut meta index output + * @param starTreeField star tree field + * @param writeState segment write state + * @param metricAggregatorInfos metric aggregator infos + * @param segmentAggregatedCount segment aggregated count + * @param dataFilePointer data file pointer + * @param dataFileLength data file length + * @throws IOException when star-tree data serialization fails + */ + public static void serializeStarTreeMetadata( + IndexOutput metaOut, + StarTreeField starTreeField, + SegmentWriteState writeState, + List metricAggregatorInfos, + Integer segmentAggregatedCount, + long dataFilePointer, + long dataFileLength + ) throws IOException { + StarTreeMetaWriter.serializeStarTreeMetadata( + metaOut, + CompositeMappedFieldType.CompositeFieldType.STAR_TREE, + starTreeField, + writeState, + metricAggregatorInfos, + segmentAggregatedCount, + dataFilePointer, + dataFileLength + ); + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataWriter.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataWriter.java new file mode 100644 index 0000000000000..5dc8d34226772 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataWriter.java @@ -0,0 +1,137 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; +import static org.opensearch.index.compositeindex.datacube.startree.node.FixedLengthStarTreeNode.SERIALIZABLE_DATA_SIZE_IN_BYTES; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils.ALL; + +/** + * Utility class for serializing a star-tree data structure. + * + * @opensearch.experimental + */ +public class StarTreeDataWriter { + + private static final Logger logger = LogManager.getLogger(StarTreeDataWriter.class); + + /** + * Serializes the star-tree data structure. + * + * @param indexOutput the IndexOutput to write the star-tree data + * @param rootNode the root node of the star-tree + * @param numNodes the total number of nodes in the star-tree + * @return the total size in bytes of the serialized star-tree data + * @throws IOException if an I/O error occurs while writing the star-tree data + */ + public static long serializeStarTree(IndexOutput indexOutput, TreeNode rootNode, int numNodes) throws IOException { + int headerSizeInBytes = computeStarTreeDataHeaderByteSize(); + long totalSizeInBytes = headerSizeInBytes + (long) numNodes * SERIALIZABLE_DATA_SIZE_IN_BYTES; + + logger.debug("Star tree size in bytes : {}", totalSizeInBytes); + + writeStarTreeHeader(indexOutput, numNodes); + writeStarTreeNodes(indexOutput, rootNode); + return totalSizeInBytes; + } + + /** + * Computes the byte size of the star-tree data header. + * + * @return the byte size of the star-tree data header + */ + private static int computeStarTreeDataHeaderByteSize() { + // Magic marker (8), version (4) + int headerSizeInBytes = 12; + + // For number of nodes. + headerSizeInBytes += Integer.BYTES; + return headerSizeInBytes; + } + + /** + * Writes the star-tree data header. + * + * @param output the IndexOutput to write the header + * @param numNodes the total number of nodes in the star-tree + * @throws IOException if an I/O error occurs while writing the header + */ + private static void writeStarTreeHeader(IndexOutput output, int numNodes) throws IOException { + output.writeLong(MAGIC_MARKER); + output.writeInt(VERSION); + output.writeInt(numNodes); + } + + /** + * Writes the star-tree nodes in a breadth-first order. + * + * @param output the IndexOutput to write the nodes + * @param rootNode the root node of the star-tree + * @throws IOException if an I/O error occurs while writing the nodes + */ + private static void writeStarTreeNodes(IndexOutput output, TreeNode rootNode) throws IOException { + Queue queue = new LinkedList<>(); + queue.add(rootNode); + + int currentNodeId = 0; + while (!queue.isEmpty()) { + TreeNode node = queue.remove(); + + if (node.children == null) { + writeStarTreeNode(output, node, ALL, ALL); + } else { + + // Sort all children nodes based on dimension value + List sortedChildren = new ArrayList<>(node.children.values()); + sortedChildren.sort(Comparator.comparingLong(o -> o.dimensionValue)); + + int firstChildId = currentNodeId + queue.size() + 1; + int lastChildId = firstChildId + sortedChildren.size() - 1; + writeStarTreeNode(output, node, firstChildId, lastChildId); + + queue.addAll(sortedChildren); + } + + currentNodeId++; + } + } + + /** + * Writes a single star-tree node + * + * @param output the IndexOutput to write the node + * @param node the star tree node to write + * @param firstChildId the ID of the first child node + * @param lastChildId the ID of the last child node + * @throws IOException if an I/O error occurs while writing the node + */ + private static void writeStarTreeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId) throws IOException { + output.writeInt(node.dimensionId); + output.writeLong(node.dimensionValue); + output.writeInt(node.startDocId); + output.writeInt(node.endDocId); + output.writeInt(node.aggregatedDocId); + output.writeByte(node.isStarNode == false ? (byte) 0 : (byte) 1); + output.writeInt(firstChildId); + output.writeInt(lastChildId); + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java new file mode 100644 index 0000000000000..9784fc6927a7e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; + +/** + * This class contains helper methods used throughout the Star Tree index implementation. + * + * @opensearch.experimental + */ +public class StarTreeHelper { + + /** + * The suffix appended to dimension field names in the Star Tree index. + */ + public static final String DIMENSION_SUFFIX = "dim"; + + /** + * The suffix appended to metric field names in the Star Tree index. + */ + public static final String METRIC_SUFFIX = "metric"; + + /** + * Returns the full field name for a dimension in the star-tree index. + * + * @param starTreeFieldName star-tree field name + * @param dimensionName name of the dimension + * @return full field name for the dimension in the star-tree index + */ + public static String fullFieldNameForStarTreeDimensionsDocValues(String starTreeFieldName, String dimensionName) { + return starTreeFieldName + "_" + dimensionName + "_" + DIMENSION_SUFFIX; + } + + /** + * Returns the full field name for a metric in the star-tree index. + * + * @param starTreeFieldName star-tree field name + * @param fieldName name of the metric field + * @param metricName name of the metric + * @return full field name for the metric in the star-tree index + */ + public static String fullFieldNameForStarTreeMetricsDocValues(String starTreeFieldName, String fieldName, String metricName) { + return MetricAggregatorInfo.toFieldName(starTreeFieldName, fieldName, metricName) + "_" + METRIC_SUFFIX; + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaWriter.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaWriter.java new file mode 100644 index 0000000000000..f6979d517997e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaWriter.java @@ -0,0 +1,226 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; +import org.opensearch.index.mapper.CompositeMappedFieldType; + +import java.io.IOException; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; + +/** + * The utility class for serializing the metadata of a star-tree data structure. + * The metadata includes information about the dimensions, metrics, and other relevant details + * related to the star tree. + * + * @opensearch.experimental + */ +public class StarTreeMetaWriter { + + private static final Logger logger = LogManager.getLogger(StarTreeMetaWriter.class); + + /** + * Serializes the star-tree metadata. + * + * @param metaOut the IndexOutput to write the metadata + * @param compositeFieldType the composite field type of the star-tree field + * @param starTreeField the star-tree field + * @param writeState the segment write state + * @param metricAggregatorInfos the list of metric aggregator information + * @param segmentAggregatedCount the aggregated document count for the segment + * @param dataFilePointer the file pointer to the start of the star tree data + * @param dataFileLength the length of the star tree data file + * @throws IOException if an I/O error occurs while serializing the metadata + */ + public static void serializeStarTreeMetadata( + IndexOutput metaOut, + CompositeMappedFieldType.CompositeFieldType compositeFieldType, + StarTreeField starTreeField, + SegmentWriteState writeState, + List metricAggregatorInfos, + Integer segmentAggregatedCount, + long dataFilePointer, + long dataFileLength + ) throws IOException { + + if (logger.isDebugEnabled()) { + long totalSizeInBytes = 0; + + // header size + totalSizeInBytes += computeHeaderByteSize(compositeFieldType, starTreeField.getName()); + // number of dimensions + totalSizeInBytes += Integer.BYTES; + // dimension field numbers + totalSizeInBytes += (long) starTreeField.getDimensionsOrder().size() * Integer.BYTES; + // metric count + totalSizeInBytes += Integer.BYTES; + // metric - metric stat pair + totalSizeInBytes += computeMetricEntriesSizeInBytes(metricAggregatorInfos); + // segment aggregated document count + totalSizeInBytes += Integer.BYTES; + // max leaf docs + totalSizeInBytes += Integer.BYTES; + // skip star node creation dimensions count + totalSizeInBytes += Integer.BYTES; + // skip star node creation dimensions field numbers + totalSizeInBytes += (long) starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims().size() * Integer.BYTES; + // data start file pointer + totalSizeInBytes += Long.BYTES; + // data length + totalSizeInBytes += Long.BYTES; + + logger.debug("Star tree size in bytes : {}", totalSizeInBytes); + } + + writeMetaHeader(metaOut, compositeFieldType, starTreeField.getName()); + writeMeta(metaOut, writeState, metricAggregatorInfos, starTreeField, segmentAggregatedCount, dataFilePointer, dataFileLength); + } + + /** + * Computes the byte size required to store the star-tree metric entry. + * + * @param metricAggregatorInfos the list of metric aggregator information + * @return the byte size required to store the metric-metric stat pairs + */ + private static long computeMetricEntriesSizeInBytes(List metricAggregatorInfos) { + + long totalMetricEntriesSize = 0; + + for (MetricAggregatorInfo metricAggregatorInfo : metricAggregatorInfos) { + totalMetricEntriesSize += metricAggregatorInfo.getField().getBytes(UTF_8).length; + totalMetricEntriesSize += metricAggregatorInfo.getMetricStat().getTypeName().getBytes(UTF_8).length; + } + + return totalMetricEntriesSize; + } + + /** + * Computes the byte size of the star-tree metadata header. + * + * @param compositeFieldType the composite field type of the star-tree field + * @param starTreeFieldName the name of the star-tree field + * @return the byte size of the star-tree metadata header + */ + private static int computeHeaderByteSize(CompositeMappedFieldType.CompositeFieldType compositeFieldType, String starTreeFieldName) { + // Magic marker (8), version (4), size of header (4) + int headerSizeInBytes = 16; + + // For star-tree field name + headerSizeInBytes += starTreeFieldName.getBytes(UTF_8).length; + + // For star tree field type + headerSizeInBytes += compositeFieldType.getName().getBytes(UTF_8).length; + + return headerSizeInBytes; + } + + /** + * Writes the star-tree metadata header. + * + * @param metaOut the IndexOutput to write the header + * @param compositeFieldType the composite field type of the star-tree field + * @param starTreeFieldName the name of the star-tree field + * @throws IOException if an I/O error occurs while writing the header + */ + private static void writeMetaHeader( + IndexOutput metaOut, + CompositeMappedFieldType.CompositeFieldType compositeFieldType, + String starTreeFieldName + ) throws IOException { + // magic marker for sanity + metaOut.writeLong(MAGIC_MARKER); + + // version + metaOut.writeInt(VERSION); + + // star tree field name + metaOut.writeString(starTreeFieldName); + + // star tree field type + metaOut.writeString(compositeFieldType.getName()); + } + + /** + * Writes the star-tree metadata. + * + * @param metaOut the IndexOutput to write the metadata + * @param writeState the segment write state + * @param metricAggregatorInfos the list of metric aggregator information + * @param starTreeField the star tree field + * @param segmentAggregatedDocCount the aggregated document count for the segment + * @param dataFilePointer the file pointer to the start of the star-tree data + * @param dataFileLength the length of the star-tree data file + * @throws IOException if an I/O error occurs while writing the metadata + */ + private static void writeMeta( + IndexOutput metaOut, + SegmentWriteState writeState, + List metricAggregatorInfos, + StarTreeField starTreeField, + Integer segmentAggregatedDocCount, + long dataFilePointer, + long dataFileLength + ) throws IOException { + + // number of dimensions + metaOut.writeInt(starTreeField.getDimensionsOrder().size()); + + // dimensions + for (Dimension dimension : starTreeField.getDimensionsOrder()) { + int dimensionFieldNumber = writeState.fieldInfos.fieldInfo(dimension.getField()).getFieldNumber(); + metaOut.writeInt(dimensionFieldNumber); + } + + // number of metrics + metaOut.writeInt(metricAggregatorInfos.size()); + + // metric - metric stat pair + for (MetricAggregatorInfo metricAggregatorInfo : metricAggregatorInfos) { + String metricName = metricAggregatorInfo.getField(); + String metricStatName = metricAggregatorInfo.getMetricStat().getTypeName(); + metaOut.writeString(metricName); + metaOut.writeString(metricStatName); + } + + // segment aggregated document count + metaOut.writeInt(segmentAggregatedDocCount); + + // max leaf docs + metaOut.writeInt(starTreeField.getStarTreeConfig().maxLeafDocs()); + + // number of skip star node creation dimensions + metaOut.writeInt(starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims().size()); + + // skip star node creations + for (String dimension : starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims()) { + int dimensionFieldNumber = writeState.fieldInfos.fieldInfo(dimension).getFieldNumber(); + metaOut.writeInt(dimensionFieldNumber); + } + + // star tree build-mode + metaOut.writeString(starTreeField.getStarTreeConfig().getBuildMode().getTypeName()); + + // star-tree data file pointer + metaOut.writeLong(dataFilePointer); + + // star-tree data file length + metaOut.writeLong(dataFileLength); + + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/TreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/TreeNode.java index 5cf737c61ab2d..9c0da6e388f2d 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/TreeNode.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/TreeNode.java @@ -11,6 +11,8 @@ import java.util.Map; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils.ALL; + /** * /** * Represents a node in a tree data structure, specifically designed for a star-tree implementation. @@ -21,8 +23,6 @@ @ExperimentalApi public class TreeNode { - public static final int ALL = -1; - /** * The dimension id for the dimension (field) associated with this star-tree node. */ diff --git a/server/src/main/java/org/opensearch/index/compositeindex/package-info.java b/server/src/main/java/org/opensearch/index/compositeindex/package-info.java index 59f18efec26b1..9a88f88d9850a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/package-info.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/package-info.java @@ -8,6 +8,7 @@ /** * Core classes for handling composite indices. - * @opensearch.experimental + * + * @opensearch.experimental */ package org.opensearch.index.compositeindex; diff --git a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java index 049d91bc42d9c..e7ec9f7fd107d 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java @@ -23,6 +23,7 @@ import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.Rounding; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentFactory; @@ -30,16 +31,27 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.MapperTestUtils; import org.opensearch.index.codec.composite.Composite99Codec; +import org.opensearch.index.compositeindex.datacube.DateDimension; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.NumericDimension; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.indices.IndicesModule; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import static org.opensearch.common.util.FeatureFlags.STAR_TREE_INDEX; +import static org.opensearch.test.OpenSearchTestCase.randomFrom; /** * Star tree doc values Lucene tests @@ -76,6 +88,32 @@ protected Codec getCodec() { return codec; } + private StarTreeMapper.StarTreeFieldType getStarTreeFieldType() { + List m1 = new ArrayList<>(); + m1.add(MetricStat.MAX); + Metric metric = new Metric("sndv", m1); + List d1CalendarIntervals = new ArrayList<>(); + d1CalendarIntervals.add(Rounding.DateTimeUnit.HOUR_OF_DAY); + StarTreeField starTreeField = getStarTreeField(d1CalendarIntervals, metric); + + return new StarTreeMapper.StarTreeFieldType("star_tree", starTreeField); + } + + private static StarTreeField getStarTreeField(List d1CalendarIntervals, Metric metric1) { + DateDimension d1 = new DateDimension("field", d1CalendarIntervals); + NumericDimension d2 = new NumericDimension("dv"); + + List metrics = List.of(metric1); + List dims = List.of(d1, d2); + StarTreeFieldConfiguration config = new StarTreeFieldConfiguration( + 100, + Collections.emptySet(), + randomFrom(StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP) // TODO : change it + ); + + return new StarTreeField("starTree", dims, metrics, config); + } + public void testStarTreeDocValues() throws IOException { Directory directory = newDirectory(); IndexWriterConfig conf = newIndexWriterConfig(null); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregatorTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregatorTests.java index 194d667cffbd7..1ff600643f198 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregatorTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregatorTests.java @@ -53,6 +53,6 @@ public void testToLongValue() { } public void testToStarTreeNumericTypeValue() { - assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L, StarTreeNumericType.DOUBLE), 0.0); + assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L), 0.0); } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregatorTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregatorTests.java index 8f8ce259bc62c..0257fbd3ea71c 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregatorTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregatorTests.java @@ -53,6 +53,6 @@ public void testToLongValue() { } public void testToStarTreeNumericTypeValue() { - assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L, StarTreeNumericType.DOUBLE), 0.0); + assertEquals(NumericUtils.sortableLongToDouble(3L), aggregator.toStarTreeNumericTypeValue(3L), 0.0); } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java index 26e2cb8e391f4..6a8ff56d5bb35 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java @@ -8,20 +8,24 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.Version; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.composite.Composite99DocValuesFormat; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; @@ -51,6 +55,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -75,9 +80,12 @@ public class BaseStarTreeBuilderTests extends OpenSearchTestCase { private static List metrics; private static Directory directory; private static FieldInfo[] fieldsInfo; - private static SegmentWriteState state; + private static SegmentWriteState writeState; private static StarTreeField starTreeField; + private static IndexOutput dataOut; + private static IndexOutput metaOut; + @BeforeClass public static void setup() throws IOException { @@ -138,7 +146,21 @@ public static void setup() throws IOException { fieldProducerMap.put(fields.get(i), docValuesProducer); } FieldInfos fieldInfos = new FieldInfos(fieldsInfo); - state = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random())); + writeState = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random())); + + String dataFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + dataOut = writeState.directory.createOutput(dataFileName, writeState.context); + + String metaFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + metaOut = writeState.directory.createOutput(metaFileName, writeState.context); mapperService = mock(MapperService.class); DocumentMapper documentMapper = mock(DocumentMapper.class); @@ -157,9 +179,13 @@ public static void setup() throws IOException { ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new BaseStarTreeBuilder(starTreeField, state, mapperService) { + builder = new BaseStarTreeBuilder(metaOut, dataOut, starTreeField, writeState, mapperService) { @Override - public void build(List starTreeValuesSubs) throws IOException {} + public void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException {} @Override public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOException {} @@ -169,6 +195,11 @@ public StarTreeDocument getStarTreeDocument(int docId) throws IOException { return null; } + @Override + public StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException { + return null; + } + @Override public List getStarTreeDocuments() { return List.of(); @@ -219,6 +250,8 @@ public void test_reduceStarTreeDocuments() { @Override public void tearDown() throws Exception { super.tearDown(); + dataOut.close(); + metaOut.close(); directory.close(); } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java index ec0e2ba838730..5667ca428f648 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java @@ -8,11 +8,13 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentWriteState; @@ -22,10 +24,12 @@ import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.Version; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.composite.Composite99DocValuesFormat; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; @@ -54,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -70,6 +75,9 @@ public class OnHeapStarTreeBuilderTests extends OpenSearchTestCase { private StarTreeField compositeField; private Map fieldProducerMap; private SegmentWriteState writeState; + private IndexOutput dataOut; + private IndexOutput metaOut; + private DocValuesConsumer docValuesConsumer; @Before public void setup() throws IOException { @@ -90,6 +98,7 @@ public void setup() throws IOException { ); DocValuesProducer docValuesProducer = mock(DocValuesProducer.class); + docValuesConsumer = mock(DocValuesConsumer.class); compositeField = new StarTreeField( "test", @@ -140,6 +149,20 @@ public void setup() throws IOException { FieldInfos fieldInfos = new FieldInfos(fieldsInfo); writeState = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random())); + String dataFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + dataOut = writeState.directory.createOutput(dataFileName, writeState.context); + + String metaFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + metaOut = writeState.directory.createOutput(metaFileName, writeState.context); + mapperService = mock(MapperService.class); DocumentMapper documentMapper = mock(DocumentMapper.class); when(mapperService.documentMapper()).thenReturn(documentMapper); @@ -162,7 +185,7 @@ public void setup() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); } public void test_sortAndAggregateStarTreeDocuments() throws IOException { @@ -191,7 +214,10 @@ public void test_sortAndAggregateStarTreeDocuments() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); int numOfAggregatedDocuments = 0; while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) { StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); @@ -209,20 +235,25 @@ public void test_sortAndAggregateStarTreeDocuments() throws IOException { } - public void test_sortAndAggregateStarTreeDocuments_nullMetric() throws IOException { + public void test_sortAndAggregateStarTreeDocuments_idempotentMetric() throws IOException { int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; starTreeDocuments[0] = new StarTreeDocument(new Long[] { 2L, 4L, 3L, 4L }, new Double[] { 12.0, 10.0, randomDouble(), 8.0, 20.0 }); - starTreeDocuments[1] = new StarTreeDocument(new Long[] { 3L, 4L, 2L, 1L }, new Double[] { 10.0, 6.0, randomDouble(), 8.0, 20.0 }); + starTreeDocuments[1] = new StarTreeDocument(new Long[] { 3L, 4L, 2L, 1L }, new Double[] { 10.0, 6.0, randomDouble(), 8.0, 24.0 }); starTreeDocuments[2] = new StarTreeDocument(new Long[] { 3L, 4L, 2L, 1L }, new Double[] { 14.0, 12.0, randomDouble(), 8.0, 20.0 }); starTreeDocuments[3] = new StarTreeDocument(new Long[] { 2L, 4L, 3L, 4L }, new Double[] { 9.0, 4.0, randomDouble(), 9.0, 12.0 }); starTreeDocuments[4] = new StarTreeDocument(new Long[] { 3L, 4L, 2L, 1L }, new Double[] { 11.0, null, randomDouble(), 8.0, 20.0 }); + StarTreeDocument expectedStarTreeDocument = new StarTreeDocument( new Long[] { 2L, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L, 8.0, 20.0 } ); + StarTreeDocument expectedStarTreeDocumentWithIdempotentMetric = new StarTreeDocument( + new Long[] { 3L, 4L, 2L, 1L }, + new Object[] { 35.0, 18.0, 3L, 8.0, 24.0 } + ); StarTreeDocument[] segmentStarTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; for (int i = 0; i < noOfStarTreeDocuments; i++) { @@ -235,7 +266,10 @@ public void test_sortAndAggregateStarTreeDocuments_nullMetric() throws IOExcepti segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); for (int dim = 0; dim < 4; dim++) { @@ -246,11 +280,14 @@ public void test_sortAndAggregateStarTreeDocuments_nullMetric() throws IOExcepti assertEquals(expectedStarTreeDocument.metrics[met], resultStarTreeDocument.metrics[met]); } - assertThrows( - "Null metric should have resulted in IllegalStateException", - IllegalStateException.class, - segmentStarTreeDocumentIterator::next - ); + resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); + for (int dim = 0; dim < 4; dim++) { + assertEquals(expectedStarTreeDocumentWithIdempotentMetric.dimensions[dim], resultStarTreeDocument.dimensions[dim]); + } + + for (int met = 0; met < 5; met++) { + assertEquals(expectedStarTreeDocumentWithIdempotentMetric.metrics[met], resultStarTreeDocument.metrics[met]); + } } @@ -295,7 +332,10 @@ public void test_sortAndAggregateStarTreeDocument_longMaxAndLongMinDimensions() segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); int numOfAggregatedDocuments = 0; while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) { StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); @@ -348,7 +388,10 @@ public void test_sortAndAggregateStarTreeDocument_DoubleMaxAndDoubleMinMetrics() segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); int numOfAggregatedDocuments = 0; while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) { StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); @@ -397,7 +440,7 @@ public void test_build_halfFloatMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -459,8 +502,11 @@ public void test_build_halfFloatMetrics() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -492,7 +538,7 @@ public void test_build_floatMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -524,8 +570,11 @@ public void test_build_floatMetrics() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -558,7 +607,7 @@ public void test_build_longMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -578,8 +627,11 @@ public void test_build_longMetrics() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -590,13 +642,13 @@ public void test_build_longMetrics() throws IOException { private static Iterator getExpectedStarTreeDocumentIterator() { List expectedStarTreeDocuments = List.of( - new StarTreeDocument(new Long[] { 2L, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L }), - new StarTreeDocument(new Long[] { 3L, 4L, 2L, 1L }, new Object[] { 35.0, 34.0, 3L }), - new StarTreeDocument(new Long[] { null, 4L, 2L, 1L }, new Object[] { 35.0, 34.0, 3L }), - new StarTreeDocument(new Long[] { null, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L }), - new StarTreeDocument(new Long[] { null, 4L, null, 1L }, new Object[] { 35.0, 34.0, 3L }), - new StarTreeDocument(new Long[] { null, 4L, null, 4L }, new Object[] { 21.0, 14.0, 2L }), - new StarTreeDocument(new Long[] { null, 4L, null, null }, new Object[] { 56.0, 48.0, 5L }) + new StarTreeDocument(new Long[] { 2L, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L, 8.0, 20.0 }), + new StarTreeDocument(new Long[] { 3L, 4L, 2L, 1L }, new Object[] { 35.0, 34.0, 3L, 6.0, 24.0 }), + new StarTreeDocument(new Long[] { null, 4L, 2L, 1L }, new Object[] { 35.0, 34.0, 3L, 6.0, 24.0 }), + new StarTreeDocument(new Long[] { null, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L, 8.0, 20.0 }), + new StarTreeDocument(new Long[] { null, 4L, null, 1L }, new Object[] { 35.0, 34.0, 3L, 6.0, 24.0 }), + new StarTreeDocument(new Long[] { null, 4L, null, 4L }, new Object[] { 21.0, 14.0, 2L, 8.0, 20.0 }), + new StarTreeDocument(new Long[] { null, 4L, null, null }, new Object[] { 56.0, 48.0, 5L, 6.0, 24.0 }) ); return expectedStarTreeDocuments.iterator(); } @@ -621,8 +673,11 @@ public void test_build() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -723,7 +778,7 @@ public void test_build_starTreeDataset() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); int noOfStarTreeDocuments = 7; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -741,8 +796,11 @@ public void test_build_starTreeDataset() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, new Long[] { metric1 }); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); List expectedStarTreeDocuments = List.of( @@ -822,7 +880,7 @@ public void testFlushFlow() throws IOException { SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField); SortedNumericDocValues m2sndv = getSortedNumericMock(metricsList, metricsWithField); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); + OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(metaOut, dataOut, sf, writeState, mapperService); SequentialDocValuesIterator[] dimDvs = { new SequentialDocValuesIterator(d1sndv), new SequentialDocValuesIterator(d2sndv) }; Iterator starTreeDocumentIterator = builder.sortAndAggregateSegmentDocuments( dimDvs, @@ -882,16 +940,17 @@ public void testMergeFlow() throws IOException { SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField); Map dimDocIdSetIterators = Map.of("field1", d1sndv, "field3", d2sndv); Map metricDocIdSetIterators = Map.of("field2", m1sndv); - StarTreeValues starTreeValues = new StarTreeValues(sf, null, dimDocIdSetIterators, metricDocIdSetIterators); + StarTreeValues starTreeValues = new StarTreeValues(sf, null, dimDocIdSetIterators, metricDocIdSetIterators, new HashMap<>()); SortedNumericDocValues f2d1sndv = getSortedNumericMock(dimList, docsWithField); SortedNumericDocValues f2d2sndv = getSortedNumericMock(dimList2, docsWithField2); SortedNumericDocValues f2m1sndv = getSortedNumericMock(metricsList, metricsWithField); Map f2dimDocIdSetIterators = Map.of("field1", f2d1sndv, "field3", f2d2sndv); Map f2metricDocIdSetIterators = Map.of("field2", f2m1sndv); - StarTreeValues starTreeValues2 = new StarTreeValues(sf, null, f2dimDocIdSetIterators, f2metricDocIdSetIterators); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); + StarTreeValues starTreeValues2 = new StarTreeValues(sf, null, f2dimDocIdSetIterators, f2metricDocIdSetIterators, new HashMap<>()); + OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(metaOut, dataOut, sf, writeState, mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); + /** * Asserting following dim / metrics [ dim1, dim2 / Sum [ metric] ] * [0, 0] | [0.0] @@ -938,15 +997,15 @@ public void testMergeFlowWithCount() throws IOException { SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField); Map dimDocIdSetIterators = Map.of("field1", d1sndv, "field3", d2sndv); Map metricDocIdSetIterators = Map.of("field2", m1sndv); - StarTreeValues starTreeValues = new StarTreeValues(sf, null, dimDocIdSetIterators, metricDocIdSetIterators); + StarTreeValues starTreeValues = new StarTreeValues(sf, null, dimDocIdSetIterators, metricDocIdSetIterators, new HashMap<>()); SortedNumericDocValues f2d1sndv = getSortedNumericMock(dimList, docsWithField); SortedNumericDocValues f2d2sndv = getSortedNumericMock(dimList2, docsWithField2); SortedNumericDocValues f2m1sndv = getSortedNumericMock(metricsList, metricsWithField); Map f2dimDocIdSetIterators = Map.of("field1", f2d1sndv, "field3", f2d2sndv); Map f2metricDocIdSetIterators = Map.of("field2", f2m1sndv); - StarTreeValues starTreeValues2 = new StarTreeValues(sf, null, f2dimDocIdSetIterators, f2metricDocIdSetIterators); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); + StarTreeValues starTreeValues2 = new StarTreeValues(sf, null, f2dimDocIdSetIterators, f2metricDocIdSetIterators, new HashMap<>()); + OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(metaOut, dataOut, sf, writeState, mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] @@ -1021,6 +1080,8 @@ public long cost() { @Override public void tearDown() throws Exception { super.tearDown(); + dataOut.close(); + metaOut.close(); directory.close(); } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java index 564ab110fa7a5..455cd84bb44f8 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.apache.lucene.index.FieldInfo; @@ -15,6 +16,7 @@ import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.Version; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; @@ -45,9 +47,13 @@ public class StarTreesBuilderTests extends OpenSearchTestCase { private StarTreeField starTreeField; private Map fieldProducerMap; private Directory directory; + private IndexOutput dataOut; + private IndexOutput metaOut; public void setUp() throws Exception { super.setUp(); + metaOut = mock(IndexOutput.class); + dataOut = mock(IndexOutput.class); mapperService = mock(MapperService.class); directory = newFSDirectory(createTempDir()); SegmentInfo segmentInfo = new SegmentInfo( @@ -89,7 +95,7 @@ public void test_buildWithNoStarTreeFields() throws IOException { when(mapperService.getCompositeFieldTypes()).thenReturn(new HashSet<>()); StarTreesBuilder starTreesBuilder = new StarTreesBuilder(segmentWriteState, mapperService); - starTreesBuilder.build(fieldProducerMap); + starTreesBuilder.build(metaOut, dataOut, fieldProducerMap, mock(DocValuesConsumer.class)); verifyNoInteractions(docValuesProducer); } @@ -97,7 +103,7 @@ public void test_buildWithNoStarTreeFields() throws IOException { public void test_getStarTreeBuilder() throws IOException { when(mapperService.getCompositeFieldTypes()).thenReturn(Set.of(starTreeFieldType)); StarTreesBuilder starTreesBuilder = new StarTreesBuilder(segmentWriteState, mapperService); - StarTreeBuilder starTreeBuilder = starTreesBuilder.getSingleTreeBuilder(starTreeField, segmentWriteState, mapperService); + StarTreeBuilder starTreeBuilder = starTreesBuilder.getSingleTreeBuilder(metaOut, dataOut, starTreeField, segmentWriteState, mapperService); assertTrue(starTreeBuilder instanceof OnHeapStarTreeBuilder); } @@ -106,7 +112,7 @@ public void test_getStarTreeBuilder_illegalArgument() { StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration(1, new HashSet<>(), StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP); StarTreeField starTreeField = new StarTreeField("star_tree", new ArrayList<>(), new ArrayList<>(), starTreeFieldConfiguration); StarTreesBuilder starTreesBuilder = new StarTreesBuilder(segmentWriteState, mapperService); - assertThrows(IllegalArgumentException.class, () -> starTreesBuilder.getSingleTreeBuilder(starTreeField, segmentWriteState, mapperService)); + assertThrows(IllegalArgumentException.class, () -> starTreesBuilder.getSingleTreeBuilder(metaOut, dataOut, starTreeField, segmentWriteState, mapperService)); } public void test_closeWithNoStarTreeFields() throws IOException {