Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[ASTS] Fix: Update getTimestampRangeRecord to return empty when recor…
Browse files Browse the repository at this point in the history
…d not present
  • Loading branch information
mdaudali committed Nov 13, 2024
1 parent 39c2be9 commit f7f270b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import org.immutables.value.Value;
Expand Down Expand Up @@ -120,17 +119,16 @@ private BucketProbeResult findCompletedBuckets(ShardAndStrategy shardAndStrategy
throw new SafeIllegalStateException("Didn't expect to get here");
}

// TODO(mdaudali): This method is still incorrect (a record does not exist for an open bucket, not just pre-init
// bucket 0). A follow up PR will address this.
private TimestampRange getTimestampRangeRecord(long queriedBucket) {
try {
return recordsTable.getTimestampRangeRecord(queriedBucket);
} catch (NoSuchElementException exception) {
throw new SafeIllegalStateException(
"Timestamp range record not found. If this has happened for bucket 0, this is possible when"
+ " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of a"
+ " bug in auto-scaling sweep. In either case, we will retry.",
exception,
SafeArg.of("queriedBucket", queriedBucket));
}
return recordsTable
.getTimestampRangeRecord(queriedBucket)
.orElseThrow(() -> new SafeIllegalStateException(
"Timestamp range record not found. If this has happened for bucket 0, this is possible when"
+ " autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of"
+ " a bug in auto-scaling sweep. In either case, we will retry.",
SafeArg.of("queriedBucket", queriedBucket)));
}

private long getStrictUpperBoundForSweptBuckets(ShardAndStrategy shardAndStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -239,10 +238,9 @@ public void deleteBucketEntry(Bucket bucket) {
}

@Override
public TimestampRange getTimestampRangeRecord(long bucketIdentifier) {
public Optional<TimestampRange> getTimestampRangeRecord(long bucketIdentifier) {
Cell cell = SweepAssignedBucketStoreKeyPersister.INSTANCE.sweepBucketRecordsCell(bucketIdentifier);
return readCell(cell, timestampRangePersister::tryDeserialize)
.orElseThrow(() -> new NoSuchElementException("No timestamp range record found for bucket identifier"));
return readCell(cell, timestampRangePersister::tryDeserialize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.palantir.atlasdb.sweep.asts.TimestampRange;
import java.util.Optional;

public interface SweepBucketRecordsTable {
/**
* Returns the {@link TimestampRange} for the given bucket identifier, throwing a
* {@link java.util.NoSuchElementException} if one is not present.
* Returns a {@link TimestampRange} for the given bucket identifier, if one exists. If the record is present, then
* the bucket is definitely closed. If the record is not present, the bucket is either open or closed (the record
* may simply not have been written yet).
*/
TimestampRange getTimestampRangeRecord(long bucketIdentifier);
Optional<TimestampRange> getTimestampRangeRecord(long bucketIdentifier);

void putTimestampRangeRecord(long bucketIdentifier, TimestampRange timestampRange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -74,19 +73,17 @@ public void setUp() {

@ParameterizedTest
@MethodSource("buckets")
public void wrapsAndRethrowsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) {
public void throwsExceptionOnAbsenceOfTimestampRangeRecords(Bucket bucket) {
when(sweepBucketPointerTable.getStartingBucketsForShards(ImmutableSet.of(bucket.shardAndStrategy())))
.thenReturn(ImmutableSet.of(bucket));
NoSuchElementException underlyingException = new NoSuchElementException();
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenThrow(underlyingException);
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier())).thenReturn(Optional.empty());

assertThatLoggableExceptionThrownBy(() -> shardProgressUpdater.updateProgress(bucket.shardAndStrategy()))
.isInstanceOf(SafeIllegalStateException.class)
.hasLogMessage("Timestamp range record not found. If this has happened for bucket 0, this is possible"
+ " when autoscaling sweep is initializing itself. Otherwise, this is potentially indicative of"
+ " a bug in auto-scaling sweep. In either case, we will retry.")
.hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier()))
.hasCause(underlyingException);
.hasExactlyArgs(SafeArg.of("queriedBucket", bucket.bucketIdentifier()));

verify(sweepBucketPointerTable, never()).updateStartingBucketForShardAndStrategy(bucket);
verify(sweepQueueProgressUpdater, never()).progressTo(eq(bucket.shardAndStrategy()), anyLong());
Expand All @@ -100,8 +97,8 @@ public void doesNotUpdateProgressOnUnstartedBucket(Bucket bucket) {
.thenReturn(ImmutableSet.of(bucket));
when(bucketProgressStore.getBucketProgress(bucket)).thenReturn(Optional.empty());
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier()))
.thenReturn(TimestampRange.of(
SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8)));
.thenReturn(Optional.of(TimestampRange.of(
SweepQueueUtils.minTsForCoarsePartition(3), SweepQueueUtils.minTsForCoarsePartition(8))));

shardProgressUpdater.updateProgress(bucket.shardAndStrategy());

Expand All @@ -120,7 +117,7 @@ public void updatesProgressOnStartedButNotCompletedBucket(SweepableBucket sweepa
when(bucketProgressStore.getBucketProgress(bucket))
.thenReturn(Optional.of(BucketProgress.createForTimestampProgress(1_234_567L)));
when(recordsTable.getTimestampRangeRecord(bucket.bucketIdentifier()))
.thenReturn(sweepableBucket.timestampRange());
.thenReturn(Optional.of(sweepableBucket.timestampRange()));

shardProgressUpdater.updateProgress(bucket.shardAndStrategy());

Expand Down Expand Up @@ -154,7 +151,8 @@ public void progressesPastOneOrMoreCompletedBucketsAndStopsCorrectly(
TimestampRange finalBucketTimestampRange = TimestampRange.of(
lastCompleteBucketTimestampRange.endExclusive(),
lastCompleteBucketTimestampRange.endExclusive() + SweepQueueUtils.TS_COARSE_GRANULARITY);
when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier)).thenReturn(finalBucketTimestampRange);
when(recordsTable.getTimestampRangeRecord(finalBucketIdentifier))
.thenReturn(Optional.of(finalBucketTimestampRange));

shardProgressUpdater.updateProgress(firstRawBucket.shardAndStrategy());

Expand Down Expand Up @@ -204,7 +202,7 @@ private static List<SweepableBucket> getSucceedingBuckets(SweepableBucket bucket

private void setupBucketRecord(SweepableBucket sweepableBucket) {
when(recordsTable.getTimestampRangeRecord(sweepableBucket.bucket().bucketIdentifier()))
.thenReturn(sweepableBucket.timestampRange());
.thenReturn(Optional.of(sweepableBucket.timestampRange()));
}

static Stream<Bucket> buckets() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -293,17 +292,15 @@ public void deleteBucketEntryDeletesBucket() {
}

@Test
public void getTimestampRangeRecordThrowsIfRecordNotPresent() {
assertThatThrownBy(() -> store.getTimestampRangeRecord(1))
.isInstanceOf(NoSuchElementException.class)
.hasMessage("No timestamp range record found for bucket identifier");
public void getTimestampRangeRecordReturnsEmptyIfRecordDoesNotExist() {
assertThat(store.getTimestampRangeRecord(1)).isEmpty();
}

@Test
public void putTimestampRangeRecordPutsRecord() {
TimestampRange timestampRange = TimestampRange.of(1, 2);
store.putTimestampRangeRecord(1, timestampRange);
assertThat(store.getTimestampRangeRecord(1)).isEqualTo(timestampRange);
assertThat(store.getTimestampRangeRecord(1)).hasValue(timestampRange);
}

@Test
Expand All @@ -323,11 +320,9 @@ public void deleteTimestampRangeRecordDoesNotThrowIfRecordNotPresent() {
public void deleteTimestampRangeRecordDeletesRecord() {
TimestampRange timestampRange = TimestampRange.of(1, 2);
store.putTimestampRangeRecord(1, timestampRange);
assertThat(store.getTimestampRangeRecord(1)).isEqualTo(timestampRange);
assertThat(store.getTimestampRangeRecord(1)).hasValue(timestampRange);

store.deleteTimestampRangeRecord(1);
assertThatThrownBy(() -> store.getTimestampRangeRecord(1))
.isInstanceOf(NoSuchElementException.class)
.hasMessage("No timestamp range record found for bucket identifier");
assertThat(store.getTimestampRangeRecord(1)).isEmpty();
}
}

0 comments on commit f7f270b

Please sign in to comment.