-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Marks end criteria reached for the segment if the Index cannot consume more rows #14479
Changes from 45 commits
f03043a
09d43d7
121d23a
06523b6
f47cb08
da8d691
2cff520
55bf033
4549e5c
36c5f0d
c2f78cb
0acb3f6
96e8663
ea48ba0
58370ca
fbc9c93
8722efc
a1467b5
27c4443
a2b95b6
6c3df28
c8f5ed2
ba59830
9f13ff1
b1143d5
33d40a5
be8ab39
3a185dd
d3a6ea2
ba07fc3
69b7e32
c60f1bd
5511e1f
ffa984d
783f8c0
f9e4b9c
3e1dde2
ed32e3d
e57d726
24243bd
541a7d9
9f67ce4
65fcaab
e6c1f43
dbaaf5c
d15a3fd
6960b5f
eb3662f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,7 @@ public class MutableSegmentImpl implements MutableSegment { | |
private final File _consumerDir; | ||
|
||
private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>(); | ||
private boolean _indexCapacityThresholdBreached; | ||
|
||
private final IdMap<FixedIntArray> _recordIdMap; | ||
|
||
|
@@ -828,7 +829,20 @@ private void addNewRow(int docId, GenericRow row) { | |
Object[] values = (Object[]) value; | ||
for (Map.Entry<IndexType, MutableIndex> indexEntry : indexContainer._mutableIndexes.entrySet()) { | ||
try { | ||
indexEntry.getValue().add(values, dictIds, docId); | ||
MutableIndex mutableIndex = indexEntry.getValue(); | ||
mutableIndex.add(values, dictIds, docId); | ||
// Few of the Immutable version of the mutable index are bounded by size like FixedBitMVForwardIndex. | ||
// If num of values overflows or size is above limit, A mutable index is unable to convert to | ||
// an immutable index and segment build fails causing the realtime consumption to stop. | ||
// Hence, The below check is a temporary measure to avoid such scenarios until immutable index | ||
// implementations are changed. | ||
if (!mutableIndex.canAddMore()) { | ||
_logger.debug( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log it as info, and capitalize the first letter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this log floods logs (logs: numColum x numOfIndexes), I update the condition to |
||
"index: {} for column: {} cannot consume more rows, marking _indexCapacityThresholdBreached as true", | ||
indexEntry.getKey(), column | ||
); | ||
_indexCapacityThresholdBreached = true; | ||
} | ||
} catch (Exception e) { | ||
recordIndexingError(indexEntry.getKey(), e); | ||
} | ||
|
@@ -1265,6 +1279,10 @@ private boolean isAggregateMetricsEnabled() { | |
return _recordIdMap != null; | ||
} | ||
|
||
public boolean canAddMore() { | ||
return !_indexCapacityThresholdBreached; | ||
} | ||
|
||
// NOTE: Okay for single-writer | ||
@SuppressWarnings("NonAtomicOperationOnVolatileField") | ||
private static class ValuesInfo { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pinot.segment.local.indexsegment.mutable; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.lang.reflect.Field; | ||
import java.net.URL; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
import org.apache.commons.io.FileUtils; | ||
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; | ||
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; | ||
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin; | ||
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory; | ||
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; | ||
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; | ||
import org.apache.pinot.segment.spi.index.IndexType; | ||
import org.apache.pinot.segment.spi.index.StandardIndexes; | ||
import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex; | ||
import org.apache.pinot.segment.spi.index.mutable.MutableIndex; | ||
import org.apache.pinot.spi.data.FieldSpec; | ||
import org.apache.pinot.spi.data.Schema; | ||
import org.apache.pinot.spi.data.readers.FileFormat; | ||
import org.apache.pinot.spi.data.readers.GenericRow; | ||
import org.apache.pinot.spi.data.readers.RecordReader; | ||
import org.apache.pinot.spi.data.readers.RecordReaderFactory; | ||
import org.apache.pinot.spi.stream.StreamMessageMetadata; | ||
import org.testng.Assert; | ||
import org.testng.annotations.Test; | ||
|
||
|
||
public class MutableSegmentEntriesAboveThresholdTest { | ||
private static final File TEMP_DIR = | ||
new File(FileUtils.getTempDirectory(), MutableSegmentEntriesAboveThresholdTest.class.getSimpleName()); | ||
private static final String AVRO_FILE = "data/test_data-mv.avro"; | ||
private Schema _schema; | ||
|
||
private static class FakeMutableForwardIndex implements MutableForwardIndex { | ||
|
||
private final MutableForwardIndex _mutableForwardIndex; | ||
private static final int THRESHOLD = 2; | ||
private int _numValues; | ||
|
||
FakeMutableForwardIndex(MutableForwardIndex mutableForwardIndex) { | ||
_mutableForwardIndex = mutableForwardIndex; | ||
_numValues = 0; | ||
} | ||
|
||
@Override | ||
public boolean canAddMore() { | ||
return _numValues < THRESHOLD; | ||
} | ||
|
||
@Override | ||
public void setDictIdMV(int docId, int[] dictIds) { | ||
_numValues += dictIds.length; | ||
_mutableForwardIndex.setDictIdMV(docId, dictIds); | ||
} | ||
|
||
@Override | ||
public int getLengthOfShortestElement() { | ||
return _mutableForwardIndex.getLengthOfShortestElement(); | ||
} | ||
|
||
@Override | ||
public int getLengthOfLongestElement() { | ||
return _mutableForwardIndex.getLengthOfLongestElement(); | ||
} | ||
|
||
@Override | ||
public void setDictId(int docId, int dictId) { | ||
_mutableForwardIndex.setDictId(docId, dictId); | ||
} | ||
|
||
@Override | ||
public boolean isDictionaryEncoded() { | ||
return _mutableForwardIndex.isDictionaryEncoded(); | ||
} | ||
|
||
@Override | ||
public boolean isSingleValue() { | ||
return _mutableForwardIndex.isSingleValue(); | ||
} | ||
|
||
@Override | ||
public FieldSpec.DataType getStoredType() { | ||
return _mutableForwardIndex.getStoredType(); | ||
} | ||
|
||
@Override | ||
public void close() | ||
throws IOException { | ||
_mutableForwardIndex.close(); | ||
} | ||
} | ||
|
||
private File getAvroFile() { | ||
URL resourceUrl = MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE); | ||
Assert.assertNotNull(resourceUrl); | ||
return new File(resourceUrl.getFile()); | ||
} | ||
|
||
private MutableSegmentImpl getMutableSegment(File avroFile) | ||
throws Exception { | ||
FileUtils.deleteQuietly(TEMP_DIR); | ||
|
||
SegmentGeneratorConfig config = | ||
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, TEMP_DIR, "testTable"); | ||
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); | ||
driver.init(config); | ||
driver.build(); | ||
|
||
_schema = config.getSchema(); | ||
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema, "testSegment"); | ||
return MutableSegmentImplTestUtils | ||
.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), | ||
Collections.emptyMap(), | ||
false, false, null, null, null, null, null, null, Collections.emptyList(), true); | ||
} | ||
|
||
@Test | ||
public void testNoLimitBreached() | ||
throws Exception { | ||
File avroFile = getAvroFile(); | ||
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile); | ||
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); | ||
try (RecordReader recordReader = RecordReaderFactory | ||
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { | ||
GenericRow reuse = new GenericRow(); | ||
while (recordReader.hasNext()) { | ||
mutableSegment.index(recordReader.next(reuse), defaultMetadata); | ||
} | ||
} | ||
assert mutableSegment.canAddMore(); | ||
} | ||
|
||
@Test | ||
public void testLimitBreached() | ||
throws Exception { | ||
File avroFile = getAvroFile(); | ||
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile); | ||
|
||
Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap"); | ||
indexContainerMapField.setAccessible(true); | ||
Map<String, Object> colVsIndexContainer = (Map<String, Object>) indexContainerMapField.get(mutableSegment); | ||
|
||
for (Map.Entry<String, Object> entry : colVsIndexContainer.entrySet()) { | ||
Object indexContainer = entry.getValue(); | ||
Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes"); | ||
mutableIndexesField.setAccessible(true); | ||
Map<IndexType, MutableIndex> indexTypeVsMutableIndex = | ||
(Map<IndexType, MutableIndex>) mutableIndexesField.get(indexContainer); | ||
|
||
MutableForwardIndex mutableForwardIndex = null; | ||
for (IndexType indexType : indexTypeVsMutableIndex.keySet()) { | ||
if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) { | ||
mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType); | ||
} | ||
} | ||
|
||
assert mutableForwardIndex != null; | ||
|
||
indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(), | ||
new FakeMutableForwardIndex(mutableForwardIndex)); | ||
} | ||
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); | ||
try (RecordReader recordReader = RecordReaderFactory | ||
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) { | ||
GenericRow reuse = new GenericRow(); | ||
while (recordReader.hasNext()) { | ||
mutableSegment.index(recordReader.next(reuse), defaultMetadata); | ||
} | ||
} | ||
|
||
assert !mutableSegment.canAddMore(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,7 +59,7 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str | |
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, | ||
List<AggregationConfig> preAggregationConfigs) { | ||
return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns, | ||
Collections.emptyMap(), false, false, null, null, null, null, null, null, preAggregationConfigs); | ||
Collections.emptyMap(), false, false, null, null, null, null, null, null, preAggregationConfigs, false); | ||
} | ||
|
||
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, | ||
|
@@ -98,6 +98,20 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str | |
UpsertConfig upsertConfig, String timeColumnName, PartitionUpsertMetadataManager partitionUpsertMetadataManager, | ||
DedupConfig dedupConfig, PartitionDedupMetadataManager partitionDedupMetadataManager, ServerMetrics serverMetrics, | ||
List<AggregationConfig> aggregationConfigs) { | ||
return createMutableSegmentImpl(schema, noDictionaryColumns, | ||
varLengthDictionaryColumns, invertedIndexColumns, | ||
jsonIndexConfigs, aggregateMetrics, nullHandlingEnabled, | ||
upsertConfig, timeColumnName, partitionUpsertMetadataManager, | ||
dedupConfig, partitionDedupMetadataManager, serverMetrics, | ||
aggregationConfigs, false); | ||
} | ||
|
||
public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can revert this since the flag is removed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missed it, my bad. |
||
Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, | ||
Map<String, JsonIndexConfig> jsonIndexConfigs, boolean aggregateMetrics, boolean nullHandlingEnabled, | ||
UpsertConfig upsertConfig, String timeColumnName, PartitionUpsertMetadataManager partitionUpsertMetadataManager, | ||
DedupConfig dedupConfig, PartitionDedupMetadataManager partitionDedupMetadataManager, ServerMetrics serverMetrics, | ||
List<AggregationConfig> aggregationConfigs, boolean thresholdForColEnabled) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend to add an overload method where the new boolean is false. That way you can reduce the number of changes in the PR but also keep backward compatibility in case some other class (in another repo) is using this method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added new method, let me know if still needs refactoring. |
||
|
||
RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class); | ||
when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Annotate it with
@VisibleForTesting