diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index 987c5fdf90fd..168473add552 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -206,7 +206,10 @@ public int dataSize() * supersedes, ie. tombstone supersedes. * * If timestamps are the same and both of them are expired livenessInfo(Ideally it shouldn't happen), - * greater localDeletionTime wins. + * greater localDeletionTime wins. If the localDeletion times are the same, prefer the + * lower TTL to make the merge deterministic (it is likely that the row has been rewritten with + * USING TTL/TIMESTAMP with an updated TTL that computes to the same local deletion time -- perhaps + * from rerunning a process to migrate user data between clusters or tables). * * @param other * the {@code LivenessInfo} to compare this info to. @@ -220,7 +223,11 @@ public boolean supersedes(LivenessInfo other) if (isExpired() ^ other.isExpired()) return isExpired(); if (isExpiring() == other.isExpiring()) - return localExpirationTime() > other.localExpirationTime(); + { + return localExpirationTime() > other.localExpirationTime() || + (localExpirationTime() == other.localExpirationTime() && ttl() < other.ttl()); + } + return isExpiring(); } diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java index 39690d50b7b8..48331a73a655 100644 --- a/src/java/org/apache/cassandra/db/rows/Cells.java +++ b/src/java/org/apache/cassandra/db/rows/Cells.java @@ -113,6 +113,16 @@ private static Cell resolveRegular(Cell left, Cell right) // would otherwise always win (unless it had an empty value), until it expired and was translated to a tombstone if (leftLocalDeletionTime != rightLocalDeletionTime) return leftLocalDeletionTime > rightLocalDeletionTime ? left : right; + + // Both cells are either tombstones or expiring at the same timestamp. If expiring and the + // TTLs differ, write the lower one -- the write is probably from a more recent + // UPDATE USING TTL AND TIMESTAMP, so select the most recent one to be deterministic and be + // closest to client intent. + if (!leftIsTombstone && left.ttl() != right.ttl()) + { + assert !rightIsTombstone; + return left.ttl() < right.ttl() ? left : right; + } } return compareValues(left, right) >= 0 ? left : right; diff --git a/test/unit/org/apache/cassandra/db/CellTest.java b/test/unit/org/apache/cassandra/db/CellTest.java index 9540c8de7456..4efcec654857 100644 --- a/test/unit/org/apache/cassandra/db/CellTest.java +++ b/test/unit/org/apache/cassandra/db/CellTest.java @@ -50,10 +50,12 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.assertj.core.api.Assertions; import org.assertj.core.api.ThrowableAssert; import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; public class CellTest { @@ -74,6 +76,8 @@ public class CellTest .addRegularColumn("v", IntegerType.instance) .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)) .build(); + public static final ByteBuffer TEST_VALUE = ByteBufferUtil.bytes("a"); + @BeforeClass public static void defineSchema() throws ConfigurationException @@ -284,6 +288,86 @@ public void testExpiringCellReconile() Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, 1)); } + + public static void assertCellsEqual(Cell cellA, Cell cellB) + { + assertEquals(cellA.timestamp(), cellB.timestamp()); + assertEquals(cellA.ttl(), cellB.ttl()); + assertEquals(cellA.localDeletionTime(), cellB.localDeletionTime()); + assertEquals(cellA.buffer(), cellB.buffer()); + } + + static void checkCommutes(ColumnMetadata cmd, long timestamp, long tsDiff, int ttl, int ttlDiff, long nowInSeconds, int nowDiff) + { + long timestampA = timestamp; + long timestampB = timestampA + tsDiff; + int ttlA = ttl; + int ttlB = ttl + ttlDiff; + long nowInSecsA = nowInSeconds; + long nowInSecsB = nowInSecsA + nowDiff; + if (nowInSecsA < 0 || nowInSecsB < 0) + return; + + Cell cellA = ttlA == 0 ? BufferCell.tombstone(cmd, timestampA, nowInSecsA) : + ttlA < 0 ? BufferCell.live(cmd, timestampA, TEST_VALUE) : + BufferCell.expiring(cmd, timestampA, ttlA, nowInSecsA, TEST_VALUE); + Cell cellB = ttlB == 0 ? BufferCell.tombstone(cmd, timestampB, nowInSecsB) : + ttlB < 0 ? BufferCell.live(cmd, timestampB, TEST_VALUE) : + BufferCell.expiring(cmd, timestampB, ttlB, nowInSecsB, TEST_VALUE); + + Cell cellAB = Cells.reconcile(cellA, cellB); + Cell cellBA = Cells.reconcile(cellB, cellA); + + assertCellsEqual(cellAB, cellBA); + } + + @Test + public void checkSameValueDifferentLivenessCommutes() + { + ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance); + long[] tsDiffs = new long[] {0L, + 1L, // microsecond + 1000L, // millisecond + 1000000L, // second + 60000000L}; // minute + int[] ttls = new int[] { -1, 0, 1, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 }; + int[] ttlDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 }; + + long nowInSeconds = FBUtilities.nowInSeconds(); + long timestamp = FBUtilities.timestampMicros(); + + for (long tsDiff: tsDiffs) + { + for (int ttl: ttls) + { + for (int ttlDiff : ttlDiffs) + { + for (Integer nowDiff : ttlDiffs) + checkCommutes(cmd, timestamp, tsDiff, ttl, ttlDiff, nowInSeconds, nowDiff); + } + } + } + } + + // Checks that reconciling a cell with a smaller TTL reconcile commutatively + // Similar to rewriting data retrieved with SELECT v, TTL(v), WRITETIMESTAMP(v) with + // INSERT SET v=? USING TTL ? AND TIMESTAMP ? + @Test + public void rewriteCellWithSmallerTTL() + { + ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance); + int[] nowDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 }; + long timestamp = FBUtilities.timestampMicros(); + long nowInSeconds = FBUtilities.nowInSeconds(); + int ttl = 3600; + + for (Integer nowDiff : nowDiffs) + { + checkCommutes(cmd, timestamp, 0L, ttl, -nowDiff, nowInSeconds, nowDiff); + } + } + + class SimplePurger implements DeletionPurger { private final long gcBefore; diff --git a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java index 193649727865..124231810f54 100644 --- a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java +++ b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java @@ -76,6 +76,11 @@ public void testSupersedes() first = LivenessInfo.withExpirationTime(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds + 1); second = LivenessInfo.withExpirationTime(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds); assertSupersedes(first, second); + + // rewritten expiring with the same expiration time and a lower TTL, take the lower TTL as likely to be more recent + first = LivenessInfo.withExpirationTime(100, 4, nowInSeconds); + second = LivenessInfo.withExpirationTime(100, 5, nowInSeconds); + assertSupersedes(first, second); } @Test diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java index a4436da88a7a..865d33a8b6a1 100644 --- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java +++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java @@ -45,6 +45,8 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.db.CellTest.assertCellsEqual; + public class RowsTest { private static final String KEYSPACE = "rows_test"; @@ -526,6 +528,58 @@ public void mergeRowDeletionSupercedesLiveness() Assert.assertEquals(0, merged.columns().size()); } + + public static BufferCell expiringWithExpirationTime(ColumnMetadata column, long timestamp, int ttl, long localDeletionTime, ByteBuffer value) + { + return expiringWithExpirationTime(column, timestamp, ttl, localDeletionTime, value, null); + } + + public static BufferCell expiringWithExpirationTime(ColumnMetadata column, long timestamp, int ttl, long localDeletionTime, ByteBuffer value, CellPath path) + { + assert ttl != Cell.NO_TTL; + return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path); + } + + @Test + public void mergeRowsWithSameExpiryDifferentTTLCommutesLiveness() + { + long now1 = FBUtilities.nowInSeconds(); + long ts1 = secondToTs(now1); + long ldt = now1 + 1000; + + Row.Builder r1Builder = BTreeRow.unsortedBuilder(); + r1Builder.newRow(c1); + LivenessInfo originalLiveness = LivenessInfo.withExpirationTime(ts1, 100, ldt); + r1Builder.addPrimaryKeyLivenessInfo(originalLiveness); + + Row.Builder r2Builder = BTreeRow.unsortedBuilder(); + r2Builder.newRow(c1); + LivenessInfo loweredTTL = LivenessInfo.withExpirationTime(ts1, 50, ldt); + r2Builder.addPrimaryKeyLivenessInfo(loweredTTL); + + Cell r2v = expiringWithExpirationTime(v, ts1, 75, ldt, BB1); + Cell r2m2 = expiringWithExpirationTime(m, ts1, 50, ldt, BB1, CellPath.create(BB2)); + Cell r2m3 = expiringWithExpirationTime(m, ts1, 75, ldt, BB2, CellPath.create(BB3)); + Cell r2m4 = expiringWithExpirationTime(m, ts1, 100, ldt, BB3, CellPath.create(BB4)); + List> expectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4); + + expectedCells.forEach(r1Builder::addCell); + expectedCells.forEach(r2Builder::addCell); + + Row r1 = r1Builder.build(); + Row r2 = r2Builder.build(); + + Row r1r2 = Rows.merge(r1, r2); + Row r2r1 = Rows.merge(r2, r1); + + DiffListener mergedListener = new DiffListener(); + Rows.diff(mergedListener, r1r2, r2r1); + + mergedListener.liveness.forEach(pair -> Assert.assertEquals(pair.merged, pair.original)); + mergedListener.cells.forEach(pair -> assertCellsEqual(pair.merged, pair.original)); + } + + // Creates a dummy cell for a (regular) column for the provided name and without a cellPath. private static Cell liveCell(ColumnMetadata name) {