Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[ASTS] Fix: Update getTimestampRangeRecord to return empty when record not present #7439

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import org.immutables.value.Value;
Expand Down Expand Up @@ -120,17 +119,16 @@ private BucketProbeResult findCompletedBuckets(ShardAndStrategy shardAndStrategy
throw new SafeIllegalStateException("Didn't expect to get here");
}

// TODO(mdaudali): This method is still incorrect (a record does not exist for an open bucket, not just pre-init
// bucket 0). A follow up PR will address this.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

private TimestampRange getTimestampRangeRecord(long queriedBucket) {
try {
return recordsTable.getTimestampRangeRecord(queriedBucket);
} catch (NoSuchElementException exception) {
throw new SafeIllegalStateException(
"Timestamp range record not found. If this has happened for bucket 0, this is possible when"
+ " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of a"
+ " bug in auto-scaling sweep. In either case, we will retry.",
exception,
SafeArg.of("queriedBucket", queriedBucket));
}
return recordsTable
.getTimestampRangeRecord(queriedBucket)
.orElseThrow(() -> new SafeIllegalStateException(
"Timestamp range record not found. If this has happened for bucket 0, this is possible when"
+ " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of"
+ " a bug in auto-scaling sweep. In either case, we will retry.",
SafeArg.of("queriedBucket", queriedBucket)));
}

private long getStrictUpperBoundForSweptBuckets(ShardAndStrategy shardAndStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -239,10 +238,9 @@ public void deleteBucketEntry(Bucket bucket) {
}

@Override
public TimestampRange getTimestampRangeRecord(long bucketIdentifier) {
public Optional<TimestampRange> getTimestampRangeRecord(long bucketIdentifier) {
Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketRecordsCell(bucketIdentifier);
return readCell(cell, timestampRangePersister::tryDeserialize)
.orElseThrow(() -> new NoSuchElementException("No timestamp range record found for bucket identifier"));
return readCell(cell, timestampRangePersister::tryDeserialize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.palantir.atlasdb.sweep.asts.TimestampRange;
import java.util.Optional;

public interface SweepBucketRecordsTable {
/**
* Returns the {@link TimestampRange} for the given bucket identifier, throwing a
* {@link java.util.NoSuchElementException} if one is not present.
* Returns a {@link TimestampRange} for the given bucket identifier, if one exists. If the record is present, then
* the bucket is definitely closed. If the record is not present, the bucket is either open or closed (the record
* may simply not have been written yet).
*/
TimestampRange getTimestampRangeRecord(long bucketIdentifier);
Optional<TimestampRange> getTimestampRangeRecord(long bucketIdentifier);

void putTimestampRangeRecord(long bucketIdentifier, TimestampRange timestampRange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -74,19 +73,17 @@ public void setUp() {

@ParameterizedTest
@MethodSource("buckets")
public void wrapsAndRethrowsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) {
public void throwsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) {
when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy())))
.thenReturn(ImmutableSet.of(bucket));
NoSuchElementException underlyingException = new NoSuchElementException();
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenThrow(underlyingException);
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(Optional.empty());

assertThatLoggableExceptionThrownBy(() -> shardProgressUpdater.updateProgress(bucket.shardAndStrategy()))
.isInstanceOf(SafeIllegalStateException.class)
.hasLogMessage("Timestamp range record not found. If this has happened for bucket 0, this is possible"
+ " when autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of"
+ " a bug in auto-scaling sweep. In either case, we will retry.")
.hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier()))
.hasCause(underlyingException);
.hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier()));

verify(sweepBucketPointerTable, never()).updateStartingBucketForShardAndStrategy(bucket);
verify(sweepQueueProgressUpdater, never()).progressTo(eq(bucket.shardAndStrategy()), anyLong());
Expand All @@ -100,8 +97,8 @@ public void doesNotUpdateProgressOnUnstartedBucket(Bucket bucket) {
.thenReturn(ImmutableSet.of(bucket));
when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty());
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier()))
.thenReturn(TimestampRange.of(
SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8)));
.thenReturn(Optional.of(TimestampRange.of(
SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8))));

shardProgressUpdater.updateProgress(bucket.shardAndStrategy());

Expand All @@ -120,7 +117,7 @@ public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepa
when(bucketProgressStore.getBucketProgress(bucket))
.thenReturn(Optional.of(BucketProgress.createForTimestampProgress(1_234_567L)));
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier()))
.thenReturn(sweepableBucket.timestampRange());
.thenReturn(Optional.of(sweepableBucket.timestampRange()));

shardProgressUpdater.updateProgress(bucket.shardAndStrategy());

Expand Down Expand Up @@ -154,7 +151,8 @@ public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectly(
TimestampRange finalBucketTimestampRange = TimestampRange.of(
lastCompleteBucketTimestampRange.endExclusive(),
lastCompleteBucketTimestampRange.endExclusive() + SweepQueueUtils.TS_COARSE_GRANULARITY);
when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier)).thenReturn(finalBucketTimestampRange);
when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier))
.thenReturn(Optional.of(finalBucketTimestampRange));

shardProgressUpdater.updateProgress(firstRawBucket.shardAndStrategy());

Expand Down Expand Up @@ -204,7 +202,7 @@ private static List<SweepableBucket> getSucceedingBuckets(SweepableBucket bucket

private void setupBucketRecord(SweepableBucket sweepableBucket) {
when(recordsTable.getTimestampRangeRecord(sweepableBucket.bucket().bucketIdentifier()))
.thenReturn(sweepableBucket.timestampRange());
.thenReturn(Optional.of(sweepableBucket.timestampRange()));
}

static Stream<Bucket> buckets() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -293,17 +292,15 @@ public void deleteBucketEntryDeletesBucket() {
}

@Test
public void getTimestampRangeRecordThrowsIfRecordNotPresent() {
assertThatThrownBy(() -> store.getTimestampRangeRecord(1))
.isInstanceOf(NoSuchElementException.class)
.hasMessage("No timestamp range record found for bucket identifier");
public void getTimestampRangeRecordReturnsEmptyIfRecordDoesNotExist() {
assertThat(store.getTimestampRangeRecord(1)).isEmpty();
}

@Test
public void putTimestampRangeRecordPutsRecord() {
TimestampRange timestampRange = TimestampRange.of(1, 2);
store.putTimestampRangeRecord(1, timestampRange);
assertThat(store.getTimestampRangeRecord(1)).isEqualTo(timestampRange);
assertThat(store.getTimestampRangeRecord(1)).hasValue(timestampRange);
}

@Test
Expand All @@ -323,11 +320,9 @@ public void deleteTimestampRangeRecordDoesNotThrowIfRecordNotPresent() {
public void deleteTimestampRangeRecordDeletesRecord() {
TimestampRange timestampRange = TimestampRange.of(1, 2);
store.putTimestampRangeRecord(1, timestampRange);
assertThat(store.getTimestampRangeRecord(1)).isEqualTo(timestampRange);
assertThat(store.getTimestampRangeRecord(1)).hasValue(timestampRange);

store.deleteTimestampRangeRecord(1);
assertThatThrownBy(() -> store.getTimestampRangeRecord(1))
.isInstanceOf(NoSuchElementException.class)
.hasMessage("No timestamp range record found for bucket identifier");
assertThat(store.getTimestampRangeRecord(1)).isEmpty();
}
}