Skip to content

Commit

Permalink
Misc fixes for upsert metadata manager (apache#12319)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Jan 24, 2024
1 parent f43664d commit 9c1bb02
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -91,7 +92,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps

// Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records.
// If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments.
protected volatile double _largestSeenComparisonValue;
protected final AtomicDouble _largestSeenComparisonValue;

// The following variables are always accessed within synchronized block
private boolean _stopped;
Expand All @@ -116,9 +117,9 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti
_serverMetrics = ServerMetrics.get();
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
if (_metadataTTL > 0) {
_largestSeenComparisonValue = loadWatermark();
_largestSeenComparisonValue = new AtomicDouble(loadWatermark());
} else {
_largestSeenComparisonValue = Double.MIN_VALUE;
_largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE);
deleteWatermark();
}
}
Expand Down Expand Up @@ -166,17 +167,17 @@ public void addSegment(ImmutableSegment segment) {
double maxComparisonValue =
((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
.getMaxValue()).doubleValue();
_largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, maxComparisonValue);
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
}

// Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
Number maxComparisonValue =
(Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) {
_logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot();
if (validDocIdsSnapshot != null) {
Expand Down Expand Up @@ -245,11 +246,20 @@ protected void doAddSegment(ImmutableSegmentImpl segment) {

// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
updatePrimaryKeyGauge(numPrimaryKeys);
_logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}

protected abstract long getNumPrimaryKeys();

protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);
}

_logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
protected void updatePrimaryKeyGauge() {
updatePrimaryKeyGauge(getNumPrimaryKeys());
}

@Override
Expand All @@ -275,7 +285,7 @@ public void preloadSegment(ImmutableSegment segment) {
}
}

private void doPreloadSegment(ImmutableSegmentImpl segment) {
protected void doPreloadSegment(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Preloading segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
long startTimeMs = System.currentTimeMillis();
Expand All @@ -301,8 +311,7 @@ private void doPreloadSegment(ImmutableSegmentImpl segment) {

// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);
updatePrimaryKeyGauge(numPrimaryKeys);
_logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
Expand Down Expand Up @@ -347,8 +356,6 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
}
}

protected abstract long getNumPrimaryKeys();

protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
@Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
Expand Down Expand Up @@ -378,8 +385,8 @@ public boolean addRecord(MutableSegment segment, RecordInfo recordInfo) {
}

/**
Returns {@code true} when the record is added to the upsert metadata manager,
{@code false} when the record is out-of-order thus not added.
* Returns {@code true} when the record is added to the upsert metadata manager, {@code false} when the record is
* out-of-order thus not added.
*/
protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo);

Expand Down Expand Up @@ -433,9 +440,7 @@ protected void doReplaceSegment(ImmutableSegment segment, IndexSegment oldSegmen

// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);

updatePrimaryKeyGauge(numPrimaryKeys);
_logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
Expand Down Expand Up @@ -506,10 +511,10 @@ public void removeSegment(IndexSegment segment) {
return;
}
// Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
Number maxComparisonValue =
(Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) {
_logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
return;
}
Expand Down Expand Up @@ -556,9 +561,7 @@ protected void doRemoveSegment(IndexSegment segment) {

// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);

updatePrimaryKeyGauge(numPrimaryKeys);
_logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
Expand Down Expand Up @@ -793,8 +796,7 @@ public synchronized void close()
// We don't remove the segment from the metadata manager when
// it's closed. This was done to make table deletion faster. Since we don't remove the segment, we never decrease
// the primary key count. So, we set the primary key count to 0 here.
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
0L);
updatePrimaryKeyGauge(0);
_logger.info("Closed the metadata manager");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
Expand Down Expand Up @@ -223,22 +222,22 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc

@Override
public void doRemoveExpiredPrimaryKeys() {
AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger();
AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
double largestSeenComparisonValue = _largestSeenComparisonValue.get();
double metadataTTLKeysThreshold;
if (_metadataTTL > 0) {
metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
metadataTTLKeysThreshold = largestSeenComparisonValue - _metadataTTL;
} else {
metadataTTLKeysThreshold = Double.MIN_VALUE;
}

double deletedKeysThreshold;

if (_deletedKeysTTL > 0) {
deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
} else {
deletedKeysThreshold = Double.MIN_VALUE;
}

_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue();
if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) {
Expand All @@ -255,29 +254,25 @@ public void doRemoveExpiredPrimaryKeys() {
}
});
if (_metadataTTL > 0) {
persistWatermark(_largestSeenComparisonValue);
persistWatermark(largestSeenComparisonValue);
}

int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
if (numDeletedTTLKeys > 0) {
_logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}", numDeletedTTLKeys,
_tableNameWithType);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
numDeletedTTLKeys);
}
// Update metrics
updatePrimaryKeyGauge();
int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get();
if (numMetadataTTLKeys > 0) {
_logger.info("Deleted {} primary keys based on metadataTTL in the table {}", numMetadataTTLKeys,
_tableNameWithType);
_logger.info("Deleted {} primary keys based on metadataTTL", numMetadataTTLKeys);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED,
numMetadataTTLKeys);
}
int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
if (numDeletedTTLKeys > 0) {
_logger.info("Deleted {} primary keys based on deletedKeysTTL", numDeletedTTLKeys);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
numDeletedTTLKeys);
}
}

/**
Returns {@code true} when the record is added to the upsert metadata manager,
{@code false} when the record is out-of-order thus not added.
*/
@Override
protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
Expand All @@ -289,7 +284,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
// When TTL is enabled, update largestSeenComparisonValue when adding new record
if (_metadataTTL > 0 || _deletedKeysTTL > 0) {
double comparisonValue = ((Number) newComparisonValue).doubleValue();
_largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, comparisonValue);
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, comparisonValue));
}

_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
Expand All @@ -310,8 +305,8 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
}
return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
// Out-of-order record
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
// this is a out-of-order record then set value to true - this indicates whether out-of-order or not
isOutOfOrderRecord.set(true);
return currentRecordLocation;
}
Expand All @@ -322,9 +317,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
}
});

// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
updatePrimaryKeyGauge();
return !isOutOfOrderRecord.get();
}

Expand Down

0 comments on commit 9c1bb02

Please sign in to comment.