Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Jun 11, 2024
1 parent feab228 commit d3e1534
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
import org.opensearch.index.compositeindex.startree.aggregators.MetricTypeFieldPair;
import org.opensearch.index.compositeindex.startree.aggregators.ValueAggregator;
import org.opensearch.index.compositeindex.startree.aggregators.ValueAggregatorFactory;
import org.opensearch.index.compositeindex.startree.builder.CompositeFieldWriter;
import org.opensearch.index.compositeindex.startree.builder.StarTreeDocValues;
import org.opensearch.index.compositeindex.startree.builder.SingleTreeBuilder;
import org.opensearch.index.compositeindex.startree.builder.StarTreeDocValuesIteratorFactory;
import org.opensearch.index.compositeindex.startree.data.StarTreeDocValues;
import org.opensearch.index.compositeindex.startree.data.StarTreeDocument;
import org.opensearch.index.compositeindex.startree.node.StarTreeNode;
import org.opensearch.index.compositeindex.startree.utils.StarTreeBuilderUtils;
import org.opensearch.index.mapper.NumberFieldMapper;
Expand All @@ -47,35 +48,36 @@
/**
* Base class for star tree builder
*/
public abstract class BaseCompositeFieldStarTreeBuilder implements CompositeFieldWriter {
public abstract class BaseSingleStarTreeBuilder implements SingleTreeBuilder {

// TODO: STAR_TREE_CODEC will be moved to CodecService once the Star Tree Codec is defined
public static final String STAR_TREE_CODEC = "startreecodec";

private static final Logger logger = LogManager.getLogger(BaseCompositeFieldStarTreeBuilder.class);
private static final Logger logger = LogManager.getLogger(BaseSingleStarTreeBuilder.class);

public static final int STAR_IN_DOC_VALUES_INDEX = 0;

public final String[] dimensionsSplitOrder;
public final Set<Integer> skipStarNodeCreationForDimensions;
public final String[] metrics;
protected final String[] dimensionsSplitOrder;
protected final Set<Integer> skipStarNodeCreationForDimensions;
protected final String[] metrics;

public final int numMetrics;
public final int numDimensions;
public int numDocs;
public int totalDocs;
public int numNodes;
public final int maxLeafDocuments;
protected final int numMetrics;
protected final int numDimensions;
protected int numDocs;
protected int totalDocs;
protected int numNodes;
protected final int maxLeafDocuments;

public final StarTreeBuilderUtils.TreeNode rootNode = getNewNode();
protected final StarTreeBuilderUtils.TreeNode rootNode = getNewNode();

public IndexOutput indexOutput;
public DocIdSetIterator[] dimensionReaders;
public DocIdSetIterator[] metricReaders;
// TODO: This will be initialized with OnHeap / OffHeap Implementations (Commented it's occurrences for now)
// private IndexOutput indexOutput;
protected DocIdSetIterator[] dimensionReaders;
protected DocIdSetIterator[] metricReaders;

public ValueAggregator[] valueAggregators;
public DocValuesConsumer docValuesConsumer;
public DocValuesProducer docValuesProducer;
protected ValueAggregator[] valueAggregators;
protected DocValuesConsumer docValuesConsumer;
protected DocValuesProducer docValuesProducer;

private final StarTreeDocValuesIteratorFactory starTreeDocValuesIteratorFactory;
private final CompositeField compositeField;
Expand All @@ -90,7 +92,7 @@ public abstract class BaseCompositeFieldStarTreeBuilder implements CompositeFiel
* @param docValuesConsumer to consume the new aggregated metrics during flush
* @param state stores the segment state
*/
protected BaseCompositeFieldStarTreeBuilder(
protected BaseSingleStarTreeBuilder(
CompositeField compositeField,
DocValuesProducer docValuesProducer,
DocValuesConsumer docValuesConsumer,
Expand All @@ -102,8 +104,8 @@ protected BaseCompositeFieldStarTreeBuilder(
String docFileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, "stttree");
logger.info("Star tree file name : {}", docFileName);

indexOutput = state.directory.createOutput(docFileName, state.context);
CodecUtil.writeIndexHeader(indexOutput, STAR_TREE_CODEC, 0, state.segmentInfo.getId(), state.segmentSuffix);
// indexOutput = state.directory.createOutput(docFileName, state.context);
// CodecUtil.writeIndexHeader(indexOutput, STAR_TREE_CODEC, 0, state.segmentInfo.getId(), state.segmentSuffix);

starTreeDocValuesIteratorFactory = new StarTreeDocValuesIteratorFactory();
this.compositeField = compositeField;
Expand Down Expand Up @@ -145,7 +147,7 @@ protected BaseCompositeFieldStarTreeBuilder(
int index = 0;
for (MetricTypeFieldPair metricTypeFieldPair : metricTypeFieldPairs) {
metrics[index] = metricTypeFieldPair.toFieldName();
valueAggregators[index] = ValueAggregatorFactory.getValueAggregator(metricTypeFieldPair.getFunctionType());
valueAggregators[index] = ValueAggregatorFactory.getValueAggregator(metricTypeFieldPair.getMetricType());
// Ignore the column for COUNT aggregation function
if (valueAggregators[index].getAggregationType() != MetricType.COUNT) {
String metricName = metricTypeFieldPair.getField();
Expand Down Expand Up @@ -293,17 +295,17 @@ protected StarTreeDocument mergeSegmentStarTreeDocument(
) {
// TODO: HANDLE KEYWORDS LATER!
if (aggregatedStarTreeDocument == null) {
long[] dimensions = Arrays.copyOf(segmentStarTreeDocument._dimensions, numDimensions);
long[] dimensions = Arrays.copyOf(segmentStarTreeDocument.dimensions, numDimensions);
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
metrics[i] = valueAggregators[i].getInitialAggregatedValue(segmentStarTreeDocument._metrics[i]);
metrics[i] = valueAggregators[i].getInitialAggregatedValue(segmentStarTreeDocument.metrics[i]);
}
return new StarTreeDocument(dimensions, metrics);
} else {
for (int i = 0; i < numMetrics; i++) {
aggregatedStarTreeDocument._metrics[i] = valueAggregators[i].applyRawValue(
aggregatedStarTreeDocument._metrics[i],
segmentStarTreeDocument._metrics[i]
aggregatedStarTreeDocument.metrics[i] = valueAggregators[i].applyRawValue(
aggregatedStarTreeDocument.metrics[i],
segmentStarTreeDocument.metrics[i]
);
}
return aggregatedStarTreeDocument;
Expand All @@ -324,17 +326,17 @@ protected StarTreeDocument mergeStarTreeDocument(
StarTreeDocument starTreeStarTreeDocument
) {
if (aggregatedStarTreeDocument == null) {
long[] dimensions = Arrays.copyOf(starTreeStarTreeDocument._dimensions, numDimensions);
long[] dimensions = Arrays.copyOf(starTreeStarTreeDocument.dimensions, numDimensions);
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
metrics[i] = valueAggregators[i].cloneAggregatedValue((Long) starTreeStarTreeDocument._metrics[i]);
metrics[i] = valueAggregators[i].cloneAggregatedValue((Long) starTreeStarTreeDocument.metrics[i]);
}
return new StarTreeDocument(dimensions, metrics);
} else {
for (int i = 0; i < numMetrics; i++) {
aggregatedStarTreeDocument._metrics[i] = valueAggregators[i].applyAggregatedValue(
(Long) starTreeStarTreeDocument._metrics[i],
(Long) aggregatedStarTreeDocument._metrics[i]
aggregatedStarTreeDocument.metrics[i] = valueAggregators[i].applyAggregatedValue(
(Long) starTreeStarTreeDocument.metrics[i],
(Long) aggregatedStarTreeDocument.metrics[i]
);
}
return aggregatedStarTreeDocument;
Expand Down Expand Up @@ -366,7 +368,7 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
logger.info("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument);

if (numDocs == 0) {
StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
// StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
return;
}

Expand All @@ -386,7 +388,7 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
createSortedDocValuesIndices(docValuesConsumer);

// Serialize and save in disk
StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
// StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);

// TODO: Write star tree metadata for off heap implementation

Expand Down Expand Up @@ -502,7 +504,7 @@ private StarTreeDocument createAggregatedDocs(StarTreeBuilderUtils.TreeNode node
}
assert aggregatedStarTreeDocument != null;
for (int i = node.dimensionId + 1; i < numDimensions; i++) {
aggregatedStarTreeDocument._dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
aggregatedStarTreeDocument.dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
}
node.aggregatedDocId = numDocs;
appendToStarTree(aggregatedStarTreeDocument);
Expand All @@ -526,7 +528,7 @@ private StarTreeDocument createAggregatedDocs(StarTreeBuilderUtils.TreeNode node
}
assert aggregatedStarTreeDocument != null;
for (int i = node.dimensionId + 1; i < numDimensions; i++) {
aggregatedStarTreeDocument._dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
aggregatedStarTreeDocument.dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
}
node.aggregatedDocId = numDocs;
appendToStarTree(aggregatedStarTreeDocument);
Expand Down Expand Up @@ -608,8 +610,8 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer) t
Map<String, Map<Long, BytesRef>> ordinalToSortedSetDocValueMap = new HashMap<>();
for (int docId = 0; docId < numDocs; docId++) {
StarTreeDocument starTreeDocument = getStarTreeDocument(docId);
for (int i = 0; i < starTreeDocument._dimensions.length; i++) {
long val = starTreeDocument._dimensions[i];
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
long val = starTreeDocument.dimensions[i];
StarTreeDocValuesWriter starTreeDocValuesWriter = dimensionWriters.get(i);
switch (starTreeDocValuesWriter.getDocValuesType()) {
case SORTED_SET:
Expand All @@ -626,9 +628,9 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer) t
throw new IllegalStateException("Unsupported doc values type");
}
}
for (int i = 0; i < starTreeDocument._metrics.length; i++) {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
try {
Number parse = NumberFieldMapper.NumberType.LONG.parse(starTreeDocument._metrics[i], true);
Number parse = NumberFieldMapper.NumberType.LONG.parse(starTreeDocument.metrics[i], true);
StarTreeDocValuesWriter starTreeDocValuesWriter = metricWriters.get(i);
((SortedNumericDocValuesWriter) starTreeDocValuesWriter.getDocValuesWriter()).addValue(docId, parse.longValue());
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -708,40 +710,23 @@ private long handleDateDimension(final String fieldName, final long val) {
}

public void close() throws IOException {
boolean success = false;
try {
if (indexOutput != null) {
indexOutput.writeInt(-1);
CodecUtil.writeFooter(indexOutput); // write checksum
}
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (success) {
IOUtils.close(indexOutput);
} else {
IOUtils.closeWhileHandlingException(indexOutput);
}
indexOutput = null;
}
// boolean success = false;
// try {
// if (indexOutput != null) {
// indexOutput.writeInt(-1);
// CodecUtil.writeFooter(indexOutput); // write checksum
// }
// success = true;
// } catch (Exception e) {
// throw new RuntimeException(e);
// } finally {
// if (success) {
// IOUtils.close(indexOutput);
// } else {
// IOUtils.closeWhileHandlingException(indexOutput);
// }
// indexOutput = null;
// }
}

/**
* Star tree document
*/
public static class StarTreeDocument {
public final long[] _dimensions;
public final Object[] _metrics;

public StarTreeDocument(long[] dimensions, Object[] metrics) {
_dimensions = dimensions;
_metrics = metrics;
}

@Override
public String toString() {
return Arrays.toString(_dimensions) + " | " + Arrays.toString(_metrics);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ public class MetricTypeFieldPair implements Comparable<MetricTypeFieldPair> {
public static final String STAR = "*";
public static final MetricTypeFieldPair COUNT_STAR = new MetricTypeFieldPair(MetricType.COUNT, STAR);

private final MetricType functionType;
private final MetricType metricType;
private final String field;

/**
* Constructor for MetricTypeFieldPair
*/
public MetricTypeFieldPair(MetricType functionType, String field) {
this.functionType = functionType;
if (functionType == MetricType.COUNT) {
public MetricTypeFieldPair(MetricType metricType, String field) {
this.metricType = metricType;
if (metricType == MetricType.COUNT) {
this.field = STAR;
} else {
this.field = field;
Expand All @@ -39,8 +39,8 @@ public MetricTypeFieldPair(MetricType functionType, String field) {
/**
* @return Metric Type
*/
public MetricType getFunctionType() {
return functionType;
public MetricType getMetricType() {
return metricType;
}

/**
Expand All @@ -51,42 +51,42 @@ public String getField() {
}

/**
* @return field name with function type and field
* @return field name with metric type and field
*/
public String toFieldName() {
return toFieldName(functionType, field);
return toFieldName(metricType, field);
}

/**
* Builds field name with function type and field
* Builds field name with metric type and field
*/
public static String toFieldName(MetricType functionType, String field) {
return functionType.getTypeName() + DELIMITER + field;
public static String toFieldName(MetricType metricType, String field) {
return metricType.getTypeName() + DELIMITER + field;
}

/**
* Builds MetricTypeFieldPair from field name
*/
public static MetricTypeFieldPair fromFieldName(String fieldName) {
String[] parts = fieldName.split(DELIMITER, 2);
return fromFunctionAndFieldName(parts[0], parts[1]);
return fromMetricAndFieldName(parts[0], parts[1]);
}

/**
* Builds MetricTypeFieldPair from function and field name
* Builds MetricTypeFieldPair from metric and field name
*/
private static MetricTypeFieldPair fromFunctionAndFieldName(String functionName, String fieldName) {
MetricType functionType = MetricType.fromTypeName(functionName);
if (functionType == MetricType.COUNT) {
private static MetricTypeFieldPair fromMetricAndFieldName(String metricName, String fieldName) {
MetricType metricType = MetricType.fromTypeName(metricName);
if (metricType == MetricType.COUNT) {
return COUNT_STAR;
} else {
return new MetricTypeFieldPair(functionType, fieldName);
return new MetricTypeFieldPair(metricType, fieldName);
}
}

@Override
public int hashCode() {
return 31 * functionType.hashCode() + field.hashCode();
return 31 * metricType.hashCode() + field.hashCode();
}

@Override
Expand All @@ -96,7 +96,7 @@ public boolean equals(Object obj) {
}
if (obj instanceof MetricTypeFieldPair) {
MetricTypeFieldPair anotherPair = (MetricTypeFieldPair) obj;
return functionType == anotherPair.functionType && field.equals(anotherPair.field);
return metricType == anotherPair.metricType && field.equals(anotherPair.field);
}
return false;
}
Expand All @@ -109,7 +109,7 @@ public String toString() {
@Override
public int compareTo(MetricTypeFieldPair other) {
return Comparator.comparing((MetricTypeFieldPair o) -> o.field)
.thenComparing((MetricTypeFieldPair o) -> o.functionType)
.thenComparing((MetricTypeFieldPair o) -> o.metricType)
.compare(this, other);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.index.compositeindex.startree.aggregators;

import org.opensearch.index.compositeindex.MetricType;
import org.opensearch.index.compositeindex.startree.data.DataType;

/**
* Sum value aggregator for star tree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.index.compositeindex.startree.aggregators;

import org.opensearch.index.compositeindex.MetricType;
import org.opensearch.index.compositeindex.startree.data.DataType;

/**
* A value aggregator that pre-aggregates on the input values for a specific type of aggregation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.index.compositeindex.startree.aggregators;

import org.opensearch.index.compositeindex.MetricType;
import org.opensearch.index.compositeindex.startree.data.DataType;

/**
* Value aggregator factory for a given aggregation type
Expand Down
Loading

0 comments on commit d3e1534

Please sign in to comment.