Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Marks end criteria reached for the segment if the Index cannot consume more rows #14479

Merged
merged 48 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f03043a
initial refactoring for handling threshold logic
noob-se7en Nov 15, 2024
09d43d7
Adds logic for endCriteriaReached based upon numValues
noob-se7en Nov 18, 2024
121d23a
add logic inside endCriteriaReached
noob-se7en Nov 18, 2024
06523b6
renames var
noob-se7en Nov 18, 2024
f47cb08
renames constant
noob-se7en Nov 18, 2024
da8d691
nit
noob-se7en Nov 18, 2024
2cff520
nit
noob-se7en Nov 18, 2024
55bf033
refactors logic to inside of addNewRow
noob-se7en Nov 18, 2024
4549e5c
remove comments
noob-se7en Nov 18, 2024
36c5f0d
removes usage of map
noob-se7en Nov 18, 2024
c2f78cb
nit
noob-se7en Nov 18, 2024
0acb3f6
fixes lint
noob-se7en Nov 18, 2024
96e8663
nit
noob-se7en Nov 19, 2024
ea48ba0
refactoring
noob-se7en Nov 20, 2024
58370ca
nit
noob-se7en Nov 20, 2024
fbc9c93
Adds test
noob-se7en Nov 21, 2024
8722efc
Adds test
noob-se7en Nov 22, 2024
a1467b5
Adds test for realtimesegmentDataManager
noob-se7en Nov 23, 2024
27c4443
lint
noob-se7en Nov 23, 2024
a2b95b6
refactors config
noob-se7en Nov 23, 2024
6c3df28
resolves commit
noob-se7en Nov 23, 2024
c8f5ed2
changes log
noob-se7en Nov 23, 2024
ba59830
nit
noob-se7en Nov 23, 2024
9f13ff1
nit
noob-se7en Nov 23, 2024
b1143d5
Adds test and minor refactoring
noob-se7en Nov 25, 2024
33d40a5
nit
noob-se7en Nov 25, 2024
be8ab39
nit
noob-se7en Nov 25, 2024
3a185dd
nit
noob-se7en Nov 25, 2024
d3a6ea2
fixes lint
noob-se7en Nov 25, 2024
ba07fc3
Addresses Pr comments
noob-se7en Nov 26, 2024
69b7e32
nit
noob-se7en Nov 26, 2024
c60f1bd
nit
noob-se7en Nov 26, 2024
5511e1f
refactors method name
noob-se7en Nov 26, 2024
ffa984d
nit
noob-se7en Nov 26, 2024
783f8c0
nit
noob-se7en Nov 26, 2024
f9e4b9c
Fixes lint
noob-se7en Nov 27, 2024
3e1dde2
fixes lint
noob-se7en Nov 28, 2024
ed32e3d
Addresses PR comment
noob-se7en Dec 4, 2024
e57d726
fix bug
noob-se7en Dec 4, 2024
24243bd
removes reflection in test
noob-se7en Dec 4, 2024
541a7d9
fix test
noob-se7en Dec 4, 2024
9f67ce4
fixes lint
noob-se7en Dec 4, 2024
65fcaab
fix log
noob-se7en Dec 4, 2024
e6c1f43
move log to debug
noob-se7en Dec 4, 2024
dbaaf5c
fixes test
noob-se7en Dec 4, 2024
d15a3fd
Addresses PR comment
noob-se7en Dec 5, 2024
6960b5f
nit
noob-se7en Dec 5, 2024
eb3662f
nit
noob-se7en Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ public enum ControllerResponseStatus {
public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";
// Stop reason sent by server as force commit message received
public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = "forceCommitMessageReceived";
// Stop reason sent by server as mutable index cannot consume more rows
// (like size reaching close to its limit or number of col values for a col is about to overflow int max)
public static final String REASON_INDEX_CAPACITY_THRESHOLD_BREACHED = "indexCapacityThresholdBreached";

// Canned responses
public static final Response RESP_NOT_LEADER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ private boolean endCriteriaReached() {
_numRowsConsumed, _numRowsIndexed);
_stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
return true;
} else if (!canAddMore()) {
_segmentLogger.info(
"Stopping consumption as mutable index cannot consume more rows - numRowsConsumed={} "
+ "numRowsIndexed={}",
_numRowsConsumed, _numRowsIndexed);
_stopReason = SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED;
return true;
}
return false;

Expand Down Expand Up @@ -700,6 +707,11 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee
return prematureExit;
}

@VisibleForTesting
boolean canAddMore() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Annotate it with @VisibleForTesting

return _realtimeSegment.canAddMore();
}

public class PartitionConsumer implements Runnable {
public void run() {
long initialConsumptionEnd = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,19 @@ public void testEndCriteriaChecking()
segmentDataManager._timeSupplier.set(endTime);
Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
}

// test end criteria reached if any of the index cannot take more rows
try (FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(false, new TimeSupplier(), null,
null, null)) {
segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());

segmentDataManager.setIndexCapacityThresholdBreached(true);

Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
Assert.assertEquals(segmentDataManager.getStopReason(),
SegmentCompletionProtocol.REASON_INDEX_CAPACITY_THRESHOLD_BREACHED);
}
}

private void setHasMessagesFetched(FakeRealtimeSegmentDataManager segmentDataManager, boolean hasMessagesFetched)
Expand Down Expand Up @@ -907,6 +920,7 @@ public static class FakeRealtimeSegmentDataManager extends RealtimeSegmentDataMa
public Map<Integer, Semaphore> _semaphoreMap;
public boolean _stubConsumeLoop = true;
private TimeSupplier _timeSupplier;
private boolean _indexCapacityThresholdBreached;

private static InstanceDataManagerConfig makeInstanceDataManagerConfig() {
InstanceDataManagerConfig dataManagerConfig = mock(InstanceDataManagerConfig.class);
Expand Down Expand Up @@ -1087,6 +1101,15 @@ public void setFinalOffset(long offset) {
setOffset(offset, "_finalOffset");
}

@Override
protected boolean canAddMore() {
return !_indexCapacityThresholdBreached;
}

public void setIndexCapacityThresholdBreached(boolean indexCapacityThresholdBreached) {
_indexCapacityThresholdBreached = indexCapacityThresholdBreached;
}

public boolean invokeEndCriteriaReached() {
Method endCriteriaReached = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final File _consumerDir;

private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>();
private boolean _indexCapacityThresholdBreached;

private final IdMap<FixedIntArray> _recordIdMap;

Expand Down Expand Up @@ -828,7 +829,20 @@ private void addNewRow(int docId, GenericRow row) {
Object[] values = (Object[]) value;
for (Map.Entry<IndexType, MutableIndex> indexEntry : indexContainer._mutableIndexes.entrySet()) {
try {
indexEntry.getValue().add(values, dictIds, docId);
MutableIndex mutableIndex = indexEntry.getValue();
mutableIndex.add(values, dictIds, docId);
// Few of the Immutable version of the mutable index are bounded by size like FixedBitMVForwardIndex.
// If num of values overflows or size is above limit, A mutable index is unable to convert to
// an immutable index and segment build fails causing the realtime consumption to stop.
// Hence, The below check is a temporary measure to avoid such scenarios until immutable index
// implementations are changed.
if (!_indexCapacityThresholdBreached && !mutableIndex.canAddMore()) {
_logger.info(
"Index: {} for column: {} cannot consume more rows, marking _indexCapacityThresholdBreached as true",
indexEntry.getKey(), column
);
_indexCapacityThresholdBreached = true;
}
} catch (Exception e) {
recordIndexingError(indexEntry.getKey(), e);
}
Expand Down Expand Up @@ -1265,6 +1279,10 @@ private boolean isAggregateMetricsEnabled() {
return _recordIdMap != null;
}

public boolean canAddMore() {
return !_indexCapacityThresholdBreached;
}

// NOTE: Okay for single-writer
@SuppressWarnings("NonAtomicOperationOnVolatileField")
private static class ValuesInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
private static final int INCREMENT_PERCENTAGE = 100;
//Increments the Initial size by 100% of initial capacity every time we runs out of capacity

// Conservative figure to not breach 2GB size limit for immutable index
private final static int DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN = 450_000_000;

// For single writer multiple readers setup, use ArrayList for writer and CopyOnWriteArrayList for reader
private final List<FixedByteSingleValueMultiColWriter> _headerWriters = new ArrayList<>();
private final List<FixedByteSingleValueMultiColReader> _headerReaders = new CopyOnWriteArrayList<>();
Expand All @@ -124,6 +127,7 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
private int _currentCapacity = 0;
private int _prevRowStartIndex = 0; // Offset in the data-buffer for the last row added.
private int _prevRowLength = 0; // Number of values in the column for the last row added.
private int _numValues = 0;

public FixedByteMVMutableForwardIndex(int maxNumberOfMultiValuesPerRow, int avgMultiValueCount, int rowCountPerChunk,
int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, String context, boolean isDictionaryEncoded,
Expand Down Expand Up @@ -200,6 +204,7 @@ private int getRowInCurrentHeader(int row) {

private int updateHeader(int row, int numValues) {
assert (numValues <= _maxNumberOfMultiValuesPerRow);
_numValues += numValues;
int newStartIndex = _prevRowStartIndex + _prevRowLength;
if (newStartIndex + numValues > _currentCapacity) {
addDataBuffer(_incrementalCapacity);
Expand Down Expand Up @@ -414,6 +419,11 @@ public void setDoubleMV(int docId, double[] values) {
}
}

@Override
public boolean canAddMore() {
return _numValues < DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN;
}

@Override
public void close()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.indexsegment.mutable;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.testng.Assert;
import org.testng.annotations.Test;


public class MutableSegmentEntriesAboveThresholdTest {
private static final File TEMP_DIR =
new File(FileUtils.getTempDirectory(), MutableSegmentEntriesAboveThresholdTest.class.getSimpleName());
private static final String AVRO_FILE = "data/test_data-mv.avro";
private Schema _schema;

private static class FakeMutableForwardIndex implements MutableForwardIndex {

private final MutableForwardIndex _mutableForwardIndex;
private static final int THRESHOLD = 2;
private int _numValues;

FakeMutableForwardIndex(MutableForwardIndex mutableForwardIndex) {
_mutableForwardIndex = mutableForwardIndex;
_numValues = 0;
}

@Override
public boolean canAddMore() {
return _numValues < THRESHOLD;
}

@Override
public void setDictIdMV(int docId, int[] dictIds) {
_numValues += dictIds.length;
_mutableForwardIndex.setDictIdMV(docId, dictIds);
}

@Override
public int getLengthOfShortestElement() {
return _mutableForwardIndex.getLengthOfShortestElement();
}

@Override
public int getLengthOfLongestElement() {
return _mutableForwardIndex.getLengthOfLongestElement();
}

@Override
public void setDictId(int docId, int dictId) {
_mutableForwardIndex.setDictId(docId, dictId);
}

@Override
public boolean isDictionaryEncoded() {
return _mutableForwardIndex.isDictionaryEncoded();
}

@Override
public boolean isSingleValue() {
return _mutableForwardIndex.isSingleValue();
}

@Override
public FieldSpec.DataType getStoredType() {
return _mutableForwardIndex.getStoredType();
}

@Override
public void close()
throws IOException {
_mutableForwardIndex.close();
}
}

private File getAvroFile() {
URL resourceUrl = MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE);
Assert.assertNotNull(resourceUrl);
return new File(resourceUrl.getFile());
}

private MutableSegmentImpl getMutableSegment(File avroFile)
throws Exception {
FileUtils.deleteQuietly(TEMP_DIR);

SegmentGeneratorConfig config =
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, TEMP_DIR, "testTable");
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
driver.build();

_schema = config.getSchema();
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema, "testSegment");
return MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
Collections.emptyMap(),
false, false, null, null, null, null, null, null, Collections.emptyList());
}

@Test
public void testNoLimitBreached()
throws Exception {
File avroFile = getAvroFile();
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
}
}
assert mutableSegment.canAddMore();
}

@Test
public void testLimitBreached()
throws Exception {
File avroFile = getAvroFile();
MutableSegmentImpl mutableSegment = getMutableSegment(avroFile);

Field indexContainerMapField = MutableSegmentImpl.class.getDeclaredField("_indexContainerMap");
indexContainerMapField.setAccessible(true);
Map<String, Object> colVsIndexContainer = (Map<String, Object>) indexContainerMapField.get(mutableSegment);

for (Map.Entry<String, Object> entry : colVsIndexContainer.entrySet()) {
Object indexContainer = entry.getValue();
Field mutableIndexesField = indexContainer.getClass().getDeclaredField("_mutableIndexes");
mutableIndexesField.setAccessible(true);
Map<IndexType, MutableIndex> indexTypeVsMutableIndex =
(Map<IndexType, MutableIndex>) mutableIndexesField.get(indexContainer);

MutableForwardIndex mutableForwardIndex = null;
for (IndexType indexType : indexTypeVsMutableIndex.keySet()) {
if (indexType.getId().equals(StandardIndexes.FORWARD_ID)) {
mutableForwardIndex = (MutableForwardIndex) indexTypeVsMutableIndex.get(indexType);
}
}

assert mutableForwardIndex != null;

indexTypeVsMutableIndex.put(new ForwardIndexPlugin().getIndexType(),
new FakeMutableForwardIndex(mutableForwardIndex));
}
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
try (RecordReader recordReader = RecordReaderFactory
.getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
mutableSegment.index(recordReader.next(reuse), defaultMetadata);
}
}

assert !mutableSegment.canAddMore();
}
}
Loading
Loading