Skip to content

Commit

Permalink
fixed issue with self rollback seen when looking into apache#660
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed May 4, 2016
1 parent 10b6920 commit 9720670
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fluo.accumulo.values.LockValue;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.core.util.Hex;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;

Expand Down Expand Up @@ -68,6 +69,6 @@ public int hashCode() {

@Override
public String toString() {
return prow + " " + pcol + " " + startTs;
return Hex.encNonAscii(prow) + " " + Hex.encNonAscii(pcol) + " " + startTs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ private void rollbackPrimaryLock(CommitData cd) throws Exception {
Flutation m = new Flutation(env, cd.prow);

m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
DelLockValue.encodeRollback(startTs, false, true));
DelLockValue.encodeRollback(startTs, true, true));
m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);

ListenableFuture<Void> future =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.Map.Entry;
import java.util.Random;

import io.fluo.accumulo.util.ColumnConstants;
import io.fluo.accumulo.util.LongUtil;
import io.fluo.accumulo.util.ZookeeperUtil;
import io.fluo.accumulo.values.DelLockValue;
import io.fluo.api.client.TransactionBase;
import io.fluo.api.config.ObserverConfiguration;
import io.fluo.api.data.Bytes;
Expand All @@ -42,6 +44,7 @@
import io.fluo.integration.ITBaseImpl;
import io.fluo.integration.TestTransaction;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
Expand Down Expand Up @@ -303,6 +306,10 @@ private void rollbackTest(boolean killTransactor) throws Exception {
int bobFinal = Integer.parseInt(tx3.get(Bytes.of("bob"), BALANCE).toString());
Assert.assertEquals(10, bobFinal);

long startTs = tx2.getStartTimestamp();
// one and only one of the rolled back locks should be marked as primary
Assert.assertTrue(wasRolledBackPrimary(startTs, "bob") ^ wasRolledBackPrimary(startTs, "joe"));

if (killTransactor) {
Assert.assertEquals(1, tx3.getStats().getDeadLocks());
Assert.assertEquals(0, tx3.getStats().getTimedOutLocks());
Expand Down Expand Up @@ -580,4 +587,71 @@ public void testCommitBug1() throws Exception {
Assert.assertNull(tx4.get().row("joe").col(BALANCE).toInteger());
Assert.assertEquals(61, tx4.get().row("jill").col(BALANCE).toInteger(0));
}

@Test
public void testRollbackSelf() throws Exception {
// test for #660... ensure when transaction rolls itself back that it properly sets the primary
// flag

TestTransaction tx1 = new TestTransaction(env);

tx1.mutate().row("bob").col(BALANCE).set(10);
tx1.mutate().row("joe").col(BALANCE).set(20);
tx1.mutate().row("jill").col(BALANCE).set(60);

tx1.done();


TestTransaction tx2 = new TestTransaction(env, "jill", BALANCE, 1);

TestTransaction tx3 = new TestTransaction(env);
tx3.mutate().row("bob").col(BALANCE).increment(5);
tx3.mutate().row("joe").col(BALANCE).increment(-5);
tx3.done();

tx2.mutate().row("bob").col(BALANCE).increment(5);
tx2.mutate().row("jill").col(BALANCE).increment(-5);


// should be able to successfully lock the primary column jill... but then should fail to lock
// bob and have to rollback
try {
tx2.commit();
Assert.fail("Expected commit exception");
} catch (CommitException ce) {

}


boolean sawExpected = wasRolledBackPrimary(tx2.getStartTimestamp(), "jill");

Assert.assertTrue(sawExpected);

TestTransaction tx4 = new TestTransaction(env);
Assert.assertEquals("15", tx4.gets("bob", BALANCE));
Assert.assertEquals("15", tx4.gets("joe", BALANCE));
Assert.assertEquals("60", tx4.gets("jill", BALANCE));
tx4.close();

}

private boolean wasRolledBackPrimary(long startTs, String rolledBackRow)
throws TableNotFoundException {
boolean sawExpected = false;
Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY);

for (Entry<Key, Value> entry : scanner) {
long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK;
long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;
String row = entry.getKey().getRowData().toString();
byte[] val = entry.getValue().get();

if (row.equals(rolledBackRow) && colType == ColumnConstants.DEL_LOCK_PREFIX && ts == startTs
&& DelLockValue.isPrimary(val)) {
sawExpected = true;
}
}
return sawExpected;
}

}

0 comments on commit 9720670

Please sign in to comment.