diff --git a/src/frontend/edu/brown/hstore/HStoreCoordinator.java b/src/frontend/edu/brown/hstore/HStoreCoordinator.java index 6779632421..497ce6b5c6 100644 --- a/src/frontend/edu/brown/hstore/HStoreCoordinator.java +++ b/src/frontend/edu/brown/hstore/HStoreCoordinator.java @@ -774,6 +774,11 @@ public void transactionInit(LocalTransaction ts, RpcCallback fragmentBuilders = allFragmentBuilders; + + // Make sure our txn is in our DependencyTracker + if (debug.val) + LOG.debug(String.format("%s - Added transaction to %s", + ts, this.depTracker.getClass().getSimpleName())); + this.depTracker.addTransaction(ts); // Figure out whether the txn will always be read-only at this partition for (WorkFragment.Builder fragmentBuilder : allFragmentBuilders) { @@ -3531,7 +3540,6 @@ public VoltTable[] dispatchWorkFragments(final LocalTransaction ts, } // FOR long undoToken = this.calculateNextUndoToken(ts, is_localReadOnly); ts.initFirstRound(undoToken, batchSize); - final ExecutionState execState = ts.getExecutionState(); final boolean prefetch = ts.hasPrefetchQueries(); final boolean predict_singlePartition = ts.isPredictSinglePartition(); diff --git a/src/frontend/edu/brown/hstore/txns/DependencyTracker.java b/src/frontend/edu/brown/hstore/txns/DependencyTracker.java index 5da2ad3e01..8f0a14d67d 100644 --- a/src/frontend/edu/brown/hstore/txns/DependencyTracker.java +++ b/src/frontend/edu/brown/hstore/txns/DependencyTracker.java @@ -4,7 +4,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -168,7 +167,7 @@ public void clear() { } private final PartitionExecutor executor; - private final Map txnStates = new IdentityHashMap<>(); + private final Map txnStates = new HashMap(); // ---------------------------------------------------------------------------- // INITIALIZATION @@ -179,14 +178,31 @@ public DependencyTracker(PartitionExecutor executor) { } public void addTransaction(LocalTransaction ts) { + if (this.txnStates.containsKey(ts.getTransactionId())) { + return; + } + // FIXME TransactionState state = new TransactionState(ts); - this.txnStates.put(ts, state); + this.txnStates.put(ts.getTransactionId(), state); + if (debug.val) + LOG.debug(String.format("Added %s to %s", ts, this)); + } + + private TransactionState getState(LocalTransaction ts) { + TransactionState state = this.txnStates.get(ts.getTransactionId()); + assert(state != null) : + String.format("Unexpected null %s handle for %s at %s", + TransactionState.class.getSimpleName(), ts, this); + return (state); } public void removeTransaction(LocalTransaction ts) { // FIXME - this.txnStates.remove(ts); + TransactionState state = this.txnStates.remove(ts.getTransactionId()); + if (debug.val && state != null) { + LOG.debug(String.format("Removed %s from %s", ts, this)); + } } // ---------------------------------------------------------------------------- @@ -194,11 +210,7 @@ public void removeTransaction(LocalTransaction ts) { // ---------------------------------------------------------------------------- protected void initRound(LocalTransaction ts) { - TransactionState state = this.txnStates.get(ts); - assert(state != null) : - String.format("Unexpected null %s handle for %s", - TransactionState.class.getSimpleName(), ts); - + final TransactionState state = this.getState(ts); assert(state.queued_results.isEmpty()) : String.format("Trying to initialize ROUND #%d for %s but there are %d queued results", ts.getCurrentRound(ts.getBasePartition()), @@ -209,11 +221,7 @@ protected void initRound(LocalTransaction ts) { } protected void startRound(LocalTransaction ts) { - TransactionState state = this.txnStates.get(ts); - assert(state != null) : - String.format("Unexpected null %s handle for %s", - TransactionState.class.getSimpleName(), ts); - + final TransactionState state = this.getState(ts); final int basePartition = ts.getBasePartition(); final int currentRound = ts.getCurrentRound(basePartition); final int batch_size = ts.getCurrentBatchSize(); @@ -257,11 +265,7 @@ protected void startRound(LocalTransaction ts) { } protected void finishRound(LocalTransaction ts) { - TransactionState state = this.txnStates.get(ts); - assert(state != null) : - String.format("Unexpected null %s handle for %s", - TransactionState.class.getSimpleName(), ts); - + final TransactionState state = this.getState(ts); assert(state.dependency_ctr == state.received_ctr) : String.format("Trying to finish ROUND #%d on partition %d for %s before it was started", ts.getCurrentRound(ts.getBasePartition()), @@ -286,14 +290,6 @@ protected void finishRound(LocalTransaction ts) { // INTERNAL METHODS // ---------------------------------------------------------------------------- - private TransactionState getState(LocalTransaction ts) { - TransactionState state = this.txnStates.get(ts); - assert(state != null) : - String.format("Unexpected null %s handle for %s", - TransactionState.class.getSimpleName(), ts); - return (state); - } - /** * * @param state @@ -803,6 +799,14 @@ public boolean isBlocked(LocalTransaction ts, WorkFragment.Builder ftask) { // DEBUG STUFF // ---------------------------------------------------------------------------- + @Override + public String toString() { + return String.format("%s{Partition=%02d / Hash=%d}", + this.getClass().getSimpleName(), + this.executor.getPartitionId(), + this.hashCode()); + } + public class Debug implements DebugContext { public DependencyInfo getDependencyInfo(LocalTransaction ts, int d_id) { final TransactionState state = getState(ts); diff --git a/src/frontend/edu/brown/hstore/txns/LocalTransaction.java b/src/frontend/edu/brown/hstore/txns/LocalTransaction.java index a017231a8c..9cf98255a7 100644 --- a/src/frontend/edu/brown/hstore/txns/LocalTransaction.java +++ b/src/frontend/edu/brown/hstore/txns/LocalTransaction.java @@ -448,7 +448,7 @@ public void initRound(int partition, long undoToken) { this, this.round_ctr[partition], partition, undoToken)); // SAME SITE, SAME PARTITION - if (this.base_partition == partition) { + if (this.base_partition == partition && this.predict_singlePartition == false) { this.state.depTracker.initRound(this); } @@ -472,7 +472,7 @@ public void startRound(int partition) { if (this.predict_singlePartition == false) this.lock.lock(); try { - this.state.depTracker.startRound(this); + if (this.predict_singlePartition == false) this.state.depTracker.startRound(this); // It's now safe to change our state to STARTED super.startRound(partition); } finally { @@ -497,7 +497,7 @@ public void finishRound(int partition) { // SAME SITE, SAME PARTITION if (this.predict_singlePartition == false) this.lock.lock(); try { - this.state.depTracker.finishRound(this); + if (this.predict_singlePartition == false) this.state.depTracker.finishRound(this); super.finishRound(partition); } finally { if (this.predict_singlePartition == false) this.lock.unlock(); @@ -516,7 +516,7 @@ public void fastFinishRound(int partition) { super.finishRound(partition); if (this.base_partition == partition) { assert(this.state != null) : "Unexpected null ExecutionState for " + this; - this.state.depTracker.finishRound(this); + if (this.predict_singlePartition == false) this.state.depTracker.finishRound(this); } } @@ -535,7 +535,7 @@ public void setPendingError(SerializableException error, boolean interruptThread interruptThread = (this.pending_error == null && interruptThread); super.setPendingError(error); if (interruptThread == false) return; - this.state.depTracker.unblock(this); + if (this.predict_singlePartition == false) this.state.depTracker.unblock(this); } @Override