Skip to content

Commit

Permalink
Merge pull request #659 from keith-turner/fluo-655
Browse files Browse the repository at this point in the history
fixes #655 made GCIter drop rolled back data
  • Loading branch information
mikewalch committed May 9, 2016
2 parents 9f813e1 + 359250f commit 59d2419
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public KeyValue(Key key, byte[] value) {
private ArrayList<KeyValue> keys = new ArrayList<>();
private ArrayList<KeyValue> keysFiltered = new ArrayList<>();
private HashSet<Long> completeTxs = new HashSet<>();
private HashSet<Long> rolledback = new HashSet<>();
private Key curCol = new Key();
private long truncationTime;
private int position = 0;
Expand Down Expand Up @@ -139,7 +140,7 @@ private boolean consumeData() throws IOException {
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;

if (colType == ColumnConstants.DATA_PREFIX) {
if (ts >= truncationTime) {
if (ts >= truncationTime && !rolledback.contains(ts)) {
return false;
}
} else {
Expand Down Expand Up @@ -174,6 +175,7 @@ private void readColMetadata() throws IOException {
keys.clear();
keysFiltered.clear();
completeTxs.clear();
rolledback.clear();

curCol.set(source.getTopKey());

Expand Down Expand Up @@ -230,13 +232,19 @@ private void readColMetadata() throws IOException {
long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
boolean complete = completeTxs.contains(txDoneTs);

if (!complete && DelLockValue.isPrimary(source.getTopValue().get())) {
byte[] val = source.getTopValue().get();

if (!complete && DelLockValue.isPrimary(val)) {
keep = true;
}

if (DelLockValue.isRollback(val)) {
rolledback.add(ts);
keep |= !isFullMajc;
}

if (ts > invalidationTime) {
invalidationTime = ts;
keep = true;
}

if (keep) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,32 +264,52 @@ public void testLock() {
Assert.assertEquals(expected, output);
}

@Test
public void testDelLockPartialCompaction() {
// on partial compaction can delete data that del lock points to... but should keep del lock
// until full major compaction

TestData input = new TestData();

input.add("0 f q DEL_LOCK 19", "0 ROLLBACK");
input.add("0 f q LOCK 19", "1 f q");
input.add("0 f q DEL_LOCK 11", "0 ROLLBACK");
input.add("0 f q LOCK 11", "1 f q");
input.add("0 f q DATA 19", "19");
input.add("0 f q DATA 11", "15");

TestData output = new TestData(newGCI(input, 23, false));

TestData expected = new TestData();
expected.add("0 f q DEL_LOCK 19", "0 ROLLBACK");
expected.add("0 f q DEL_LOCK 11", "0 ROLLBACK");

Assert.assertEquals(expected, output);
}

@Test
public void testDelLock() {
TestData input = new TestData();

input.add("0 f q DEL_LOCK 19", "19");
input.add("0 f q DEL_LOCK 19", "0 ROLLBACK");
input.add("0 f q LOCK 19", "1 f q");
input.add("0 f q DEL_LOCK 11", "11");
input.add("0 f q DEL_LOCK 11", "0 ROLLBACK");
input.add("0 f q LOCK 11", "1 f q");
input.add("0 f q DATA 19", "19");
input.add("0 f q DATA 11", "15");

TestData output = new TestData(newGCI(input, 23));

TestData expected = new TestData();
expected.add("0 f q DEL_LOCK 19", "19");
expected.add("0 f q DATA 19", "19");
expected.add("0 f q DATA 11", "15");

Assert.assertEquals(expected, output);

// test write that supercedes a del lock
input = new TestData();
input.add("0 f q WRITE 22", "21");
input.add("0 f q DEL_LOCK 19", "19");
input.add("0 f q DEL_LOCK 19", "0 ROLLBACK");
input.add("0 f q LOCK 19", "1 f q");
input.add("0 f q DEL_LOCK 11", "11");
input.add("0 f q DEL_LOCK 11", "0 ROLLBACK");
input.add("0 f q LOCK 11", "1 f q");
input.add("0 f q DATA 21", "17");
input.add("0 f q DATA 19", "19");
Expand All @@ -303,9 +323,9 @@ public void testDelLock() {

Assert.assertEquals(expected, output);

// test del_lock followed by write... should keep del_lock
// test del_lock followed by write...
input = new TestData();
input.add("0 f q DEL_LOCK 19", "19");
input.add("0 f q DEL_LOCK 19", "0 ROLLBACK");
input.add("0 f q LOCK 19", "1 f q");
input.add("0 f q WRITE 15", "11");
input.add("0 f q DATA 19", "19");
Expand All @@ -314,9 +334,7 @@ public void testDelLock() {
output = new TestData(newGCI(input, 23));

expected = new TestData();
expected.add("0 f q DEL_LOCK 19", "19");
expected.add("0 f q WRITE 15", "11");
expected.add("0 f q DATA 19", "19");
expected.add("0 f q DATA 11", "15");

Assert.assertEquals(expected, output);
Expand Down Expand Up @@ -541,7 +559,9 @@ public void testMultiColumn() {

TestData input = new TestData();

input.add("0 f a DEL_LOCK 19", "19");
// important that del lock has same timestamp as lock in another column... should not delete the
// lock or data in other column
input.add("0 f a DEL_LOCK 19", "0 ROLLBACK");
input.add("0 f a LOCK 19", "1 f q");
input.add("0 f a DATA 19", "15");

Expand All @@ -562,9 +582,6 @@ public void testMultiColumn() {

TestData expected = new TestData();

expected.add("0 f a DEL_LOCK 19", "19");
expected.add("0 f a DATA 19", "15");

expected.add("0 f b LOCK 19", "1 f q");
expected.add("0 f b DATA 19", "16");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
import java.util.Map.Entry;

import com.google.common.collect.Iterables;
import io.fluo.accumulo.format.FluoFormatter;
import io.fluo.accumulo.util.ColumnConstants;
import io.fluo.accumulo.util.ZookeeperPath;
import io.fluo.accumulo.util.ZookeeperUtil;
import io.fluo.api.data.Column;
import io.fluo.core.impl.TransactionImpl.CommitData;
import io.fluo.core.impl.TransactorNode;
import io.fluo.integration.BankUtil;
import io.fluo.integration.ITBaseImpl;
import io.fluo.integration.TestTransaction;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
Expand Down Expand Up @@ -118,6 +123,65 @@ public void testDeletedDataIsDropped() throws Exception {
tx4.done();
}

@Test(timeout = 60000)
public void testRolledBackDataIsDropped() throws Exception {

Column col1 = new Column("fam1", "q1");
Column col2 = new Column("fam1", "q2");

TransactorNode t2 = new TransactorNode(env);
TestTransaction tx2 = new TestTransaction(env, t2);

for (int r = 0; r < 10; r++) {
tx2.mutate().row(r + "").col(col1).set("1" + r + "0");
tx2.mutate().row(r + "").col(col2).set("1" + r + "1");
}

CommitData cd = tx2.createCommitData();
Assert.assertTrue(tx2.preCommit(cd));

t2.close();

// rollback data
TestTransaction tx3 = new TestTransaction(env, t2);
for (int r = 0; r < 10; r++) {
tx3.gets(r + "", col1);
tx3.gets(r + "", col2);
}
tx3.done();

Assert.assertEquals(20, countInTable("-LOCK"));
Assert.assertEquals(20, countInTable("-DEL_LOCK"));
Assert.assertEquals(20, countInTable("-DATA"));

// flush should drop locks and data
conn.tableOperations().flush(table, null, null, true);

Assert.assertEquals(0, countInTable("-LOCK"));
Assert.assertEquals(20, countInTable("-DEL_LOCK"));
Assert.assertEquals(0, countInTable("-DATA"));

// compact should drop all del locks except for primary
conn.tableOperations().compact(table, new CompactionConfig().setWait(true));

Assert.assertEquals(0, countInTable("-LOCK"));
Assert.assertEquals(1, countInTable("-DEL_LOCK"));
Assert.assertEquals(0, countInTable("-DATA"));
}

private int countInTable(String str) throws TableNotFoundException {
int count = 0;
FluoFormatter ff = new FluoFormatter();
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
for (String e : Iterables.transform(scanner, ff)) {
if (e.contains(str)) {
count++;
}
}

return count;
}

@Test
public void testGetOldestTimestamp() throws Exception {
// we are expecting an error in this test
Expand Down

0 comments on commit 59d2419

Please sign in to comment.