From 9720670041c4fa1d401cbccbc1c7d32ecea35952 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 4 May 2016 19:06:47 -0400 Subject: [PATCH] fixed issue with self rollback seen when looking into #660 --- .../io/fluo/core/impl/PrimaryRowColumn.java | 3 +- .../io/fluo/core/impl/TransactionImpl.java | 2 +- .../io/fluo/integration/impl/FailureIT.java | 74 +++++++++++++++++++ 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/io/fluo/core/impl/PrimaryRowColumn.java b/modules/core/src/main/java/io/fluo/core/impl/PrimaryRowColumn.java index 9d4d998bf..fc752eda8 100644 --- a/modules/core/src/main/java/io/fluo/core/impl/PrimaryRowColumn.java +++ b/modules/core/src/main/java/io/fluo/core/impl/PrimaryRowColumn.java @@ -20,6 +20,7 @@ import io.fluo.accumulo.values.LockValue; import io.fluo.api.data.Bytes; import io.fluo.api.data.Column; +import io.fluo.core.util.Hex; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -68,6 +69,6 @@ public int hashCode() { @Override public String toString() { - return prow + " " + pcol + " " + startTs; + return Hex.encNonAscii(prow) + " " + Hex.encNonAscii(pcol) + " " + startTs; } } diff --git a/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java index e463dbe9f..1d374604f 100644 --- a/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/io/fluo/core/impl/TransactionImpl.java @@ -921,7 +921,7 @@ private void rollbackPrimaryLock(CommitData cd) throws Exception { Flutation m = new Flutation(env, cd.prow); m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs, - DelLockValue.encodeRollback(startTs, false, true)); + DelLockValue.encodeRollback(startTs, true, true)); m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY); ListenableFuture future = diff --git a/modules/integration/src/test/java/io/fluo/integration/impl/FailureIT.java b/modules/integration/src/test/java/io/fluo/integration/impl/FailureIT.java index 2c70a7fce..92bdb64db 100644 --- a/modules/integration/src/test/java/io/fluo/integration/impl/FailureIT.java +++ b/modules/integration/src/test/java/io/fluo/integration/impl/FailureIT.java @@ -20,8 +20,10 @@ import java.util.Map.Entry; import java.util.Random; +import io.fluo.accumulo.util.ColumnConstants; import io.fluo.accumulo.util.LongUtil; import io.fluo.accumulo.util.ZookeeperUtil; +import io.fluo.accumulo.values.DelLockValue; import io.fluo.api.client.TransactionBase; import io.fluo.api.config.ObserverConfiguration; import io.fluo.api.data.Bytes; @@ -42,6 +44,7 @@ import io.fluo.integration.ITBaseImpl; import io.fluo.integration.TestTransaction; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -303,6 +306,10 @@ private void rollbackTest(boolean killTransactor) throws Exception { int bobFinal = Integer.parseInt(tx3.get(Bytes.of("bob"), BALANCE).toString()); Assert.assertEquals(10, bobFinal); + long startTs = tx2.getStartTimestamp(); + // one and only one of the rolled back locks should be marked as primary + Assert.assertTrue(wasRolledBackPrimary(startTs, "bob") ^ wasRolledBackPrimary(startTs, "joe")); + if (killTransactor) { Assert.assertEquals(1, tx3.getStats().getDeadLocks()); Assert.assertEquals(0, tx3.getStats().getTimedOutLocks()); @@ -580,4 +587,71 @@ public void testCommitBug1() throws Exception { Assert.assertNull(tx4.get().row("joe").col(BALANCE).toInteger()); Assert.assertEquals(61, tx4.get().row("jill").col(BALANCE).toInteger(0)); } + + @Test + public void testRollbackSelf() throws Exception { + // test for #660... ensure when transaction rolls itself back that it properly sets the primary + // flag + + TestTransaction tx1 = new TestTransaction(env); + + tx1.mutate().row("bob").col(BALANCE).set(10); + tx1.mutate().row("joe").col(BALANCE).set(20); + tx1.mutate().row("jill").col(BALANCE).set(60); + + tx1.done(); + + + TestTransaction tx2 = new TestTransaction(env, "jill", BALANCE, 1); + + TestTransaction tx3 = new TestTransaction(env); + tx3.mutate().row("bob").col(BALANCE).increment(5); + tx3.mutate().row("joe").col(BALANCE).increment(-5); + tx3.done(); + + tx2.mutate().row("bob").col(BALANCE).increment(5); + tx2.mutate().row("jill").col(BALANCE).increment(-5); + + + // should be able to successfully lock the primary column jill... but then should fail to lock + // bob and have to rollback + try { + tx2.commit(); + Assert.fail("Expected commit exception"); + } catch (CommitException ce) { + + } + + + boolean sawExpected = wasRolledBackPrimary(tx2.getStartTimestamp(), "jill"); + + Assert.assertTrue(sawExpected); + + TestTransaction tx4 = new TestTransaction(env); + Assert.assertEquals("15", tx4.gets("bob", BALANCE)); + Assert.assertEquals("15", tx4.gets("joe", BALANCE)); + Assert.assertEquals("60", tx4.gets("jill", BALANCE)); + tx4.close(); + + } + + private boolean wasRolledBackPrimary(long startTs, String rolledBackRow) + throws TableNotFoundException { + boolean sawExpected = false; + Scanner scanner = conn.createScanner(getCurTableName(), Authorizations.EMPTY); + + for (Entry entry : scanner) { + long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + long ts = entry.getKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; + String row = entry.getKey().getRowData().toString(); + byte[] val = entry.getValue().get(); + + if (row.equals(rolledBackRow) && colType == ColumnConstants.DEL_LOCK_PREFIX && ts == startTs + && DelLockValue.isPrimary(val)) { + sawExpected = true; + } + } + return sawExpected; + } + }