Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Marks end criteria reached for the segment if the Index cannot consume more rows #14479

Merged
merged 48 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
f03043a
initial refactoring for handling threshold logic
noob-se7en Nov 15, 2024
09d43d7
Adds logic for endCriteriaReached based upon numValues
noob-se7en Nov 18, 2024
121d23a
add logic inside endCriteriaReached
noob-se7en Nov 18, 2024
06523b6
renames var
noob-se7en Nov 18, 2024
f47cb08
renames constant
noob-se7en Nov 18, 2024
da8d691
nit
noob-se7en Nov 18, 2024
2cff520
nit
noob-se7en Nov 18, 2024
55bf033
refactors logic to inside of addNewRow
noob-se7en Nov 18, 2024
4549e5c
remove comments
noob-se7en Nov 18, 2024
36c5f0d
removes usage of map
noob-se7en Nov 18, 2024
c2f78cb
nit
noob-se7en Nov 18, 2024
0acb3f6
fixes lint
noob-se7en Nov 18, 2024
96e8663
nit
noob-se7en Nov 19, 2024
ea48ba0
refactoring
noob-se7en Nov 20, 2024
58370ca
nit
noob-se7en Nov 20, 2024
fbc9c93
Adds test
noob-se7en Nov 21, 2024
8722efc
Adds test
noob-se7en Nov 22, 2024
a1467b5
Adds test for realtimesegmentDataManager
noob-se7en Nov 23, 2024
27c4443
lint
noob-se7en Nov 23, 2024
a2b95b6
refactors config
noob-se7en Nov 23, 2024
6c3df28
resolves commit
noob-se7en Nov 23, 2024
c8f5ed2
changes log
noob-se7en Nov 23, 2024
ba59830
nit
noob-se7en Nov 23, 2024
9f13ff1
nit
noob-se7en Nov 23, 2024
b1143d5
Adds test and minor refactoring
noob-se7en Nov 25, 2024
33d40a5
nit
noob-se7en Nov 25, 2024
be8ab39
nit
noob-se7en Nov 25, 2024
3a185dd
nit
noob-se7en Nov 25, 2024
d3a6ea2
fixes lint
noob-se7en Nov 25, 2024
ba07fc3
Addresses Pr comments
noob-se7en Nov 26, 2024
69b7e32
nit
noob-se7en Nov 26, 2024
c60f1bd
nit
noob-se7en Nov 26, 2024
5511e1f
refactors method name
noob-se7en Nov 26, 2024
ffa984d
nit
noob-se7en Nov 26, 2024
783f8c0
nit
noob-se7en Nov 26, 2024
f9e4b9c
Fixes lint
noob-se7en Nov 27, 2024
3e1dde2
fixes lint
noob-se7en Nov 28, 2024
ed32e3d
Addresses PR comment
noob-se7en Dec 4, 2024
e57d726
fix bug
noob-se7en Dec 4, 2024
24243bd
removes reflection in test
noob-se7en Dec 4, 2024
541a7d9
fix test
noob-se7en Dec 4, 2024
9f67ce4
fixes lint
noob-se7en Dec 4, 2024
65fcaab
fix log
noob-se7en Dec 4, 2024
e6c1f43
move log to debug
noob-se7en Dec 4, 2024
dbaaf5c
fixes test
noob-se7en Dec 4, 2024
d15a3fd
Addresses PR comment
noob-se7en Dec 5, 2024
6960b5f
nit
noob-se7en Dec 5, 2024
eb3662f
nit
noob-se7en Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public enum ControllerResponseStatus {
public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";
// Stop reason sent by server as force commit message received
public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = "forceCommitMessageReceived";
public static final String REASON_NUM_OF_COL_VALUES_ABOVE_THRESHOLD = "numColValuesAboveThreshold";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be always on. We may introduce a config to turn it off if we are not confident about this new logic, but if it is not very complicated we can remove this config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering new impln now where we are keeping conservative size estimation check as well, I guess it makes sense to enable this change only behind a flag?


// Canned responses
public static final Response RESP_NOT_LEADER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ public void deleteSegmentFile() {
private String _stopReason = null;
private final Semaphore _segBuildSemaphore;
private final boolean _isOffHeap;
private final boolean _thresholdForNumOfColValuesEnabled;
/**
* Whether null handling is enabled by default. This value is only used if
* {@link Schema#isEnableColumnBasedNullHandling()} is false.
Expand Down Expand Up @@ -362,6 +363,13 @@ private boolean endCriteriaReached() {
_numRowsConsumed, _numRowsIndexed);
_stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
return true;
} else if (_thresholdForNumOfColValuesEnabled && _realtimeSegment.isNumOfColValuesAboveThreshold()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, so it is fairly easy to stop consumption and commit

_segmentLogger.info(
"Stopping consumption as num of values for a column is above threshold - numRowsConsumed={} "
+ "numRowsIndexed={}",
_numRowsConsumed, _numRowsIndexed);
_stopReason = SegmentCompletionProtocol.REASON_NUM_OF_COL_VALUES_ABOVE_THRESHOLD;
return true;
}
return false;

Expand Down Expand Up @@ -1233,6 +1241,7 @@ private static class ConsumptionStopIndicator {
final Logger _logger;
final ServerSegmentCompletionProtocolHandler _protocolHandler;
final String _reason;

private ConsumptionStopIndicator(StreamPartitionMsgOffset offset, String segmentName, String instanceId,
ServerSegmentCompletionProtocolHandler protocolHandler, String reason, Logger logger) {
_offset = offset;
Expand Down Expand Up @@ -1529,6 +1538,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf

_isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
_defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
_thresholdForNumOfColValuesEnabled = tableConfig.getValidationConfig().isThresholdForNumOfColValuesEnabled();

// Start new realtime segment
String consumerDir = realtimeTableDataManager.getConsumerDir();
Expand All @@ -1552,7 +1562,8 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
.setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord())
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setDedupTimeColumn(tableConfig.getDedupTimeColumn())
.setFieldConfigList(tableConfig.getFieldConfigList());
.setFieldConfigList(tableConfig.getFieldConfigList())
.setThresholdForNumOfColValuesEnabled(_thresholdForNumOfColValuesEnabled);

// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
Expand Down Expand Up @@ -1625,7 +1636,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
"Failed to initialize segment data manager", e));
_segmentLogger.warn(
"Scheduling task to call controller to mark the segment as OFFLINE in Ideal State due"
+ " to initialization error: '{}'",
+ " to initialization error: '{}'",
e.getMessage());
// Since we are going to throw exception from this thread (helix execution thread), the externalview
// entry for this segment will be ERROR. We allow time for Helix to make this transition, and then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public class MutableSegmentImpl implements MutableSegment {
private static final int EXPECTED_COMPRESSION = 1000;
private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of recordIdMap for updatable metrics.
private static final int MIN_RECORD_ID_MAP_CACHE_SIZE = 10000; // Min overflow map size for updatable metrics.
private final static int DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN = 2_000_000_000;

private final Logger _logger;
private final long _startTimeMillis = System.currentTimeMillis();
Expand All @@ -147,8 +148,10 @@ public class MutableSegmentImpl implements MutableSegment {
private final int _mainPartitionId; // partition id designated for this consuming segment
private final boolean _defaultNullHandlingEnabled;
private final File _consumerDir;
private final boolean _thresholdForNumOfColValuesEnabled;

private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>();
private boolean _numOfColValuesLimitBreached = false;

private final IdMap<FixedIntArray> _recordIdMap;

Expand Down Expand Up @@ -225,6 +228,7 @@ public boolean isMutableSegment() {
_mainPartitionId = config.getPartitionId();
_defaultNullHandlingEnabled = config.isNullHandlingEnabled();
_consumerDir = new File(config.getConsumerDir());
_thresholdForNumOfColValuesEnabled = config.isThresholdForNumOfColValuesEnabled();

Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
Expand Down Expand Up @@ -797,6 +801,18 @@ private void addNewRow(int docId, GenericRow row) {
recordIndexingError(indexEntry.getKey(), e);
}
}

if (_thresholdForNumOfColValuesEnabled) {
int prevCount = indexContainer._valuesInfo.getNumValues();
long newCount = prevCount + 1L + values.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total values itself is not enough. We should perform a per-index check (add an api to the MutableIndex and let it return if it can take more values).
E.g. for MV forward index, if we get 1B values, but each value takes more than 2 bytes, we will run into the same exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutable index is unbounded right (No code enforced limit)? From the code I see Realtime Mutable index is always created with dictionary (even for MV VarByte col with noDict enabled in config).

Hence from numOfValues we know that size of mutable index is approx: numOfValues * 4 Bytes.

But we are more interested in the size of immutable index since that's where exception is being thrown. However Immutable index can be larger or even smaller than mutable index as implementation is completely diff. So while building mutable index we need to keep some state to estimate the approx size of the immutable version of the index (Like while building mutable fwd index, we need to keep an estimation of bitmap, numBitsPerValue, header size, etc).

We should perform a per-index check (add an api to the MutableIndex and let it return if it can take more values).

So this might be adding too much complexity to every mutable index since now after every row consumption we need to update estimated size of the corresponding immutable index and it tight couples code with immutable index logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially what we want to ensure is that whatever accepted in mutable segment won't cause problem when the mutable segment is sealed.
I'm okay if we only limit total values here, but we need to enhance immutable index so that it can hold 2B values regardless of the bit length per value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. Okay, let me figure out if we can even do this on ingestion side considering we have 5 different implementations of immutable forward index for MV columns each having diff size limit.

Copy link
Contributor Author

@noob-se7en noob-se7en Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep threshold near 500 million for numOfValues which will be good enough for few immutable indexes but not for varByte fwd index having 4GB Limit (2GB wasted). However this will ensure we never encounter numOfValues overflow and index limit size reached exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we can figure out a conservative value that will never cause over 2GB buffer. Once we support larger index, we can increate this value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will pick the task to keep the indexes unbounded post this.


if (newCount > DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN) {
_logger.warn("Number of total values for column {} is {} and has breached the threshold limit {}",
column, newCount, DEFAULT_THRESHOLD_FOR_NUM_OF_VALUES_PER_COLUMN);
_numOfColValuesLimitBreached = true;
}
}

indexContainer._valuesInfo.updateMVNumValues(values.length);
}
}
Expand Down Expand Up @@ -1229,6 +1245,10 @@ private boolean isAggregateMetricsEnabled() {
return _recordIdMap != null;
}

public boolean isNumOfColValuesAboveThreshold() {
return _numOfColValuesLimitBreached;
}

// NOTE: Okay for single-writer
@SuppressWarnings("NonAtomicOperationOnVolatileField")
private static class ValuesInfo {
Expand Down Expand Up @@ -1285,6 +1305,10 @@ void updateVarByteMVMaxRowLengthInBytes(Object entry, DataType dataType) {
throw new IllegalStateException("Invalid type=" + dataType);
}
}

int getNumValues() {
return _numValues;
}
}

private static Map<String, Pair<String, ValueAggregator>> getMetricsAggregators(RealtimeSegmentConfig segmentConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class RealtimeSegmentConfig {
private final String _consumerDir;
private final List<FieldConfig> _fieldConfigList;
private final List<AggregationConfig> _ingestionAggregationConfigs;
private final boolean _thresholdForNumOfColValuesEnabled;

// TODO: Clean up this constructor. Most of these things can be extracted from tableConfig.

Expand All @@ -89,7 +90,8 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
List<String> upsertComparisonColumns, String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn,
boolean upsertDropOutOfOrderRecord, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
String dedupTimeColumn, PartitionDedupMetadataManager partitionDedupMetadataManager,
List<FieldConfig> fieldConfigList, List<AggregationConfig> ingestionAggregationConfigs) {
List<FieldConfig> fieldConfigList, List<AggregationConfig> ingestionAggregationConfigs,
boolean enableThresholdForNumOfValues) {
_tableNameWithType = tableNameWithType;
_segmentName = segmentName;
_streamName = streamName;
Expand Down Expand Up @@ -119,6 +121,7 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_fieldConfigList = fieldConfigList;
_ingestionAggregationConfigs = ingestionAggregationConfigs;
_thresholdForNumOfColValuesEnabled = enableThresholdForNumOfValues;
}

public String getTableNameWithType() {
Expand Down Expand Up @@ -241,6 +244,10 @@ public List<AggregationConfig> getIngestionAggregationConfigs() {
return _ingestionAggregationConfigs;
}

public boolean isThresholdForNumOfColValuesEnabled() {
return _thresholdForNumOfColValuesEnabled;
}

public static class Builder {
private String _tableNameWithType;
private String _segmentName;
Expand Down Expand Up @@ -275,6 +282,7 @@ public static class Builder {
private PartitionDedupMetadataManager _partitionDedupMetadataManager;
private List<FieldConfig> _fieldConfigList;
private List<AggregationConfig> _ingestionAggregationConfigs;
private boolean _thresholdForNumOfColValuesEnabled = false;

public Builder() {
_indexConfigByCol = new HashMap<>();
Expand Down Expand Up @@ -475,6 +483,11 @@ public Builder setIngestionAggregationConfigs(List<AggregationConfig> ingestionA
return this;
}

public Builder setThresholdForNumOfColValuesEnabled(boolean thresholdForNumOfColValuesEnabled) {
_thresholdForNumOfColValuesEnabled = thresholdForNumOfColValuesEnabled;
return this;
}

public RealtimeSegmentConfig build() {
Map<String, FieldIndexConfigs> indexConfigByCol = Maps.newHashMapWithExpectedSize(_indexConfigByCol.size());
for (Map.Entry<String, FieldIndexConfigs.Builder> entry : _indexConfigByCol.entrySet()) {
Expand All @@ -487,7 +500,7 @@ public RealtimeSegmentConfig build() {
_defaultNullHandlingEnabled, _consumerDir, _upsertMode, _upsertConsistencyMode, _upsertComparisonColumns,
_upsertDeleteRecordColumn, _upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
_partitionUpsertMetadataManager, _dedupTimeColumn, _partitionDedupMetadataManager, _fieldConfigList,
_ingestionAggregationConfigs);
_ingestionAggregationConfigs, _thresholdForNumOfColValuesEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig {
// using the specified download scheme. Both realtime tables and offline tables can set this field.
// For more usage of this field, please refer to this design doc: https://tinyurl.com/f63ru4sb
private String _peerSegmentDownloadScheme;
private boolean _thresholdForNumOfColValuesEnabled;

@Deprecated
public String getSegmentAssignmentStrategy() {
Expand Down Expand Up @@ -218,6 +219,14 @@ public void setPeerSegmentDownloadScheme(String peerSegmentDownloadScheme) {
_peerSegmentDownloadScheme = peerSegmentDownloadScheme;
}

public boolean isThresholdForNumOfColValuesEnabled() {
return _thresholdForNumOfColValuesEnabled;
}

public void setThresholdForNumOfColValuesEnabled(boolean thresholdForNumOfColValuesEnabled) {
_thresholdForNumOfColValuesEnabled = thresholdForNumOfColValuesEnabled;
}

public String getCrypterClassName() {
return _crypterClassName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class TableConfigBuilder {
private ReplicaGroupStrategyConfig _replicaGroupStrategyConfig;
private CompletionConfig _completionConfig;
private String _crypterClassName;
private boolean _thresholdForNumOfColValuesEnabled = false;

// Tenant config related
private String _brokerTenant;
Expand Down Expand Up @@ -439,6 +440,7 @@ public TableConfig build() {
validationConfig.setReplication(_numReplicas);
validationConfig.setPeerSegmentDownloadScheme(_peerSegmentDownloadScheme);
validationConfig.setCrypterClassName(_crypterClassName);
validationConfig.setThresholdForNumOfColValuesEnabled(_thresholdForNumOfColValuesEnabled);

// Tenant config
TenantConfig tenantConfig = new TenantConfig(_brokerTenant, _serverTenant, _tagOverrideConfig);
Expand Down
Loading