Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/java/org/apache/cassandra/db/LivenessInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ public int dataSize()
* supersedes, ie. tombstone supersedes.
*
* If timestamps are the same and both of them are expired livenessInfo(Ideally it shouldn't happen),
* greater localDeletionTime wins.
* greater localDeletionTime wins. If the localDeletion times are the same, prefer the
* lower TTL to make the merge deterministic (it is likely that the row has been rewritten with
* USING TTL/TIMESTAMP with an updated TTL that computes to the same local deletion time -- perhaps
* from rerunning a process to migrate user data between clusters or tables).
*
* @param other
* the {@code LivenessInfo} to compare this info to.
Expand All @@ -220,7 +223,11 @@ public boolean supersedes(LivenessInfo other)
if (isExpired() ^ other.isExpired())
return isExpired();
if (isExpiring() == other.isExpiring())
return localExpirationTime() > other.localExpirationTime();
{
return localExpirationTime() > other.localExpirationTime() ||
(localExpirationTime() == other.localExpirationTime() && ttl() < other.ttl());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; maybe update the comment to this method about the ttls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Updated.

}

return isExpiring();
}

Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/db/rows/Cells.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ private static Cell<?> resolveRegular(Cell<?> left, Cell<?> right)
// would otherwise always win (unless it had an empty value), until it expired and was translated to a tombstone
if (leftLocalDeletionTime != rightLocalDeletionTime)
return leftLocalDeletionTime > rightLocalDeletionTime ? left : right;

// Both cells are either tombstones or expiring at the same timestamp. If expiring and the
// TTLs differ, write the lower one -- the write is probably from a more recent
// UPDATE USING TTL AND TIMESTAMP, so select the most recent one to be deterministic and be
// closest to client intent.
if (!leftIsTombstone && left.ttl() != right.ttl())
{
assert !rightIsTombstone;
return left.ttl() < right.ttl() ? left : right;
}
}

return compareValues(left, right) >= 0 ? left : right;
Expand Down
84 changes: 84 additions & 0 deletions test/unit/org/apache/cassandra/db/CellTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;

public class CellTest
{
Expand All @@ -74,6 +76,8 @@ public class CellTest
.addRegularColumn("v", IntegerType.instance)
.addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
.build();
public static final ByteBuffer TEST_VALUE = ByteBufferUtil.bytes("a");


@BeforeClass
public static void defineSchema() throws ConfigurationException
Expand Down Expand Up @@ -284,6 +288,86 @@ public void testExpiringCellReconile()
Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, 1));
}


public static void assertCellsEqual(Cell<?> cellA, Cell<?> cellB)
{
assertEquals(cellA.timestamp(), cellB.timestamp());
assertEquals(cellA.ttl(), cellB.ttl());
assertEquals(cellA.localDeletionTime(), cellB.localDeletionTime());
assertEquals(cellA.buffer(), cellB.buffer());
}

static void checkCommutes(ColumnMetadata cmd, long timestamp, long tsDiff, int ttl, int ttlDiff, long nowInSeconds, int nowDiff)
{
long timestampA = timestamp;
long timestampB = timestampA + tsDiff;
int ttlA = ttl;
int ttlB = ttl + ttlDiff;
long nowInSecsA = nowInSeconds;
long nowInSecsB = nowInSecsA + nowDiff;
if (nowInSecsA < 0 || nowInSecsB < 0)
return;

Cell<?> cellA = ttlA == 0 ? BufferCell.tombstone(cmd, timestampA, nowInSecsA) :
ttlA < 0 ? BufferCell.live(cmd, timestampA, TEST_VALUE) :
BufferCell.expiring(cmd, timestampA, ttlA, nowInSecsA, TEST_VALUE);
Cell<?> cellB = ttlB == 0 ? BufferCell.tombstone(cmd, timestampB, nowInSecsB) :
ttlB < 0 ? BufferCell.live(cmd, timestampB, TEST_VALUE) :
BufferCell.expiring(cmd, timestampB, ttlB, nowInSecsB, TEST_VALUE);

Cell<?> cellAB = Cells.reconcile(cellA, cellB);
Cell<?> cellBA = Cells.reconcile(cellB, cellA);

assertCellsEqual(cellAB, cellBA);
}

@Test
public void checkSameValueDifferentLivenessCommutes()
{
ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance);
long[] tsDiffs = new long[] {0L,
1L, // microsecond
1000L, // millisecond
1000000L, // second
60000000L}; // minute
int[] ttls = new int[] { -1, 0, 1, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 };
int[] ttlDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 };

long nowInSeconds = FBUtilities.nowInSeconds();
long timestamp = FBUtilities.timestampMicros();

for (long tsDiff: tsDiffs)
{
for (int ttl: ttls)
{
for (int ttlDiff : ttlDiffs)
{
for (Integer nowDiff : ttlDiffs)
checkCommutes(cmd, timestamp, tsDiff, ttl, ttlDiff, nowInSeconds, nowDiff);
}
}
}
}

// Checks that reconciling a cell with a smaller TTL reconcile commutatively
// Similar to rewriting data retrieved with SELECT v, TTL(v), WRITETIMESTAMP(v) with
// INSERT SET v=? USING TTL ? AND TIMESTAMP ?
@Test
public void rewriteCellWithSmallerTTL()
{
ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance);
int[] nowDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 };
long timestamp = FBUtilities.timestampMicros();
long nowInSeconds = FBUtilities.nowInSeconds();
int ttl = 3600;

for (Integer nowDiff : nowDiffs)
{
checkCommutes(cmd, timestamp, 0L, ttl, -nowDiff, nowInSeconds, nowDiff);
}
}


class SimplePurger implements DeletionPurger
{
private final long gcBefore;
Expand Down
5 changes: 5 additions & 0 deletions test/unit/org/apache/cassandra/db/LivenessInfoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void testSupersedes()
first = LivenessInfo.withExpirationTime(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds + 1);
second = LivenessInfo.withExpirationTime(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
assertSupersedes(first, second);

// rewritten expiring with the same expiration time and a lower TTL, take the lower TTL as likely to be more recent
first = LivenessInfo.withExpirationTime(100, 4, nowInSeconds);
second = LivenessInfo.withExpirationTime(100, 5, nowInSeconds);
assertSupersedes(first, second);
}

@Test
Expand Down
54 changes: 54 additions & 0 deletions test/unit/org/apache/cassandra/db/rows/RowsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.db.CellTest.assertCellsEqual;

public class RowsTest
{
private static final String KEYSPACE = "rows_test";
Expand Down Expand Up @@ -526,6 +528,58 @@ public void mergeRowDeletionSupercedesLiveness()
Assert.assertEquals(0, merged.columns().size());
}


public static BufferCell expiringWithExpirationTime(ColumnMetadata column, long timestamp, int ttl, long localDeletionTime, ByteBuffer value)
{
return expiringWithExpirationTime(column, timestamp, ttl, localDeletionTime, value, null);
}

public static BufferCell expiringWithExpirationTime(ColumnMetadata column, long timestamp, int ttl, long localDeletionTime, ByteBuffer value, CellPath path)
{
assert ttl != Cell.NO_TTL;
return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
}

@Test
public void mergeRowsWithSameExpiryDifferentTTLCommutesLiveness()
{
long now1 = FBUtilities.nowInSeconds();
long ts1 = secondToTs(now1);
long ldt = now1 + 1000;

Row.Builder r1Builder = BTreeRow.unsortedBuilder();
r1Builder.newRow(c1);
LivenessInfo originalLiveness = LivenessInfo.withExpirationTime(ts1, 100, ldt);
r1Builder.addPrimaryKeyLivenessInfo(originalLiveness);

Row.Builder r2Builder = BTreeRow.unsortedBuilder();
r2Builder.newRow(c1);
LivenessInfo loweredTTL = LivenessInfo.withExpirationTime(ts1, 50, ldt);
r2Builder.addPrimaryKeyLivenessInfo(loweredTTL);

Cell<?> r2v = expiringWithExpirationTime(v, ts1, 75, ldt, BB1);
Cell<?> r2m2 = expiringWithExpirationTime(m, ts1, 50, ldt, BB1, CellPath.create(BB2));
Cell<?> r2m3 = expiringWithExpirationTime(m, ts1, 75, ldt, BB2, CellPath.create(BB3));
Cell<?> r2m4 = expiringWithExpirationTime(m, ts1, 100, ldt, BB3, CellPath.create(BB4));
List<Cell<?>> expectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4);

expectedCells.forEach(r1Builder::addCell);
expectedCells.forEach(r2Builder::addCell);

Row r1 = r1Builder.build();
Row r2 = r2Builder.build();

Row r1r2 = Rows.merge(r1, r2);
Row r2r1 = Rows.merge(r2, r1);

DiffListener mergedListener = new DiffListener();
Rows.diff(mergedListener, r1r2, r2r1);

mergedListener.liveness.forEach(pair -> Assert.assertEquals(pair.merged, pair.original));
mergedListener.cells.forEach(pair -> assertCellsEqual(pair.merged, pair.original));
}


// Creates a dummy cell for a (regular) column for the provided name and without a cellPath.
private static Cell<?> liveCell(ColumnMetadata name)
{
Expand Down