Skip to content

Commit

Permalink
Added in the ability to create/remove txns from the DependencyTracker…
Browse files Browse the repository at this point in the history
…s in the proper places. We only need to do it for distributed txns when they actually try to execute work on remote partitions #125
  • Loading branch information
apavlo committed May 12, 2013
1 parent dbf57c0 commit e737233
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 44 deletions.
5 changes: 5 additions & 0 deletions src/frontend/edu/brown/hstore/HStoreCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,11 @@ public void transactionInit(LocalTransaction ts, RpcCallback<TransactionInitResp

// Make sure that we initialize our internal PrefetchState for this txn
ts.initializePrefetch();

// We also need to add our boy to its base partition's DependencyTracker
// This is so that we can store the prefetch results when they come back
hstore_site.getDependencyTracker(ts.getBasePartition()).addTransaction(ts);

TransactionInitRequest[] requests = this.queryPrefetchPlanner.generateWorkFragments(ts);

// If the PrefetchQueryPlanner returns a null array, then there is nothing
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/edu/brown/hstore/HStoreSite.java
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,10 @@ protected void deleteLocalTransaction(LocalTransaction ts, final Status status)
t_estimator = this.executors[base_partition].getTransactionEstimator();
assert(t_estimator != null);
}
if (singlePartitioned == false) {
this.depTrackers[base_partition].removeTransaction(ts);
}

try {
switch (status) {
case OK:
Expand Down
32 changes: 20 additions & 12 deletions src/frontend/edu/brown/hstore/PartitionExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,6 @@ public void initHStoreSite(HStoreSite hstore_site) {
LOG.trace(String.format("Initializing HStoreSite components at partition %d", this.partitionId));
assert(this.hstore_site == null);
this.hstore_site = hstore_site;
this.hstore_coordinator = hstore_site.getCoordinator();
this.depTracker = hstore_site.getDependencyTracker(this.partitionId);
this.thresholds = hstore_site.getThresholds();
this.txnInitializer = hstore_site.getTransactionInitializer();
this.queueManager = hstore_site.getTransactionQueueManager();
Expand Down Expand Up @@ -838,7 +836,7 @@ public void updateConf(HStoreConf hstore_conf) {
* Has the opportunity to do startup config.
*/
@Override
public void run() {
public final void run() {
if (this.hstore_site == null) {
String msg = String.format("Trying to start %s for partition %d before its HStoreSite was initialized",
this.getClass().getSimpleName(), this.partitionId);
Expand All @@ -849,18 +847,21 @@ else if (this.self != null) {
this.getClass().getSimpleName(), this.partitionId);
throw new RuntimeException(msg);
}

assert(this.hstore_site != null);
assert(this.hstore_coordinator != null);
assert(this.specExecScheduler != null);
assert(this.queueManager != null);


this.self = Thread.currentThread();
this.self.setName(HStoreThreadManager.getThreadName(this.hstore_site, this.partitionId));

this.hstore_coordinator = hstore_site.getCoordinator();
this.depTracker = hstore_site.getDependencyTracker(this.partitionId);
this.hstore_site.getThreadManager().registerEEThread(partition);
this.shutdown_latch = new Semaphore(0);
this.shutdown_state = ShutdownState.STARTED;
if (hstore_conf.site.exec_profiling) profiler.start_time = System.currentTimeMillis();

assert(this.hstore_site != null);
assert(this.hstore_coordinator != null);
assert(this.specExecScheduler != null);
assert(this.queueManager != null);

// *********************************** DEBUG ***********************************
if (hstore_conf.site.exec_validate_work) {
Expand Down Expand Up @@ -2623,14 +2624,15 @@ else if (is_remote == false) {
LOG.trace(String.format("%s - Storing %d dependency results locally for successful work fragment",
ts, result.size()));
assert(result.size() == fragment.getOutputDepIdCount());
DependencyTracker otherTracker = this.hstore_site.getDependencyTracker(ts.getBasePartition());
for (int i = 0, cnt = result.size(); i < cnt; i++) {
int dep_id = fragment.getOutputDepId(i);
if (trace.val)
LOG.trace(String.format("%s - Storing DependencyId #%d [numRows=%d]\n%s",
ts, dep_id, result.dependencies[i].getRowCount(),
result.dependencies[i]));
try {
this.depTracker.addResult(local_ts, this.partitionId, dep_id, result.dependencies[i]);
otherTracker.addResult(local_ts, this.partitionId, dep_id, result.dependencies[i]);
} catch (Throwable ex) {
ex.printStackTrace();
String msg = String.format("Failed to stored Dependency #%d for %s [idx=%d, fragmentId=%d]",
Expand Down Expand Up @@ -3212,7 +3214,8 @@ else if (samePartitions == false) {
ts, execState.tmp_partitionFragments.size()));

// Block until we get all of our responses.
results = this.dispatchWorkFragments(ts, batchParams, batchSize, execState.tmp_partitionFragments);
results = this.dispatchWorkFragments(ts, batchParams, batchSize,
execState.tmp_partitionFragments);
}
if (debug.val && results == null)
LOG.warn("Got back a null results array for " + ts + "\n" + plan.toString());
Expand Down Expand Up @@ -3521,6 +3524,12 @@ public VoltTable[] dispatchWorkFragments(final LocalTransaction ts,
int num_skipped = 0;
int total = 0;
Collection<WorkFragment.Builder> fragmentBuilders = allFragmentBuilders;

// Make sure our txn is in our DependencyTracker
if (debug.val)
LOG.debug(String.format("%s - Added transaction to %s",
ts, this.depTracker.getClass().getSimpleName()));
this.depTracker.addTransaction(ts);

// Figure out whether the txn will always be read-only at this partition
for (WorkFragment.Builder fragmentBuilder : allFragmentBuilders) {
Expand All @@ -3531,7 +3540,6 @@ public VoltTable[] dispatchWorkFragments(final LocalTransaction ts,
} // FOR
long undoToken = this.calculateNextUndoToken(ts, is_localReadOnly);
ts.initFirstRound(undoToken, batchSize);
final ExecutionState execState = ts.getExecutionState();
final boolean prefetch = ts.hasPrefetchQueries();
final boolean predict_singlePartition = ts.isPredictSinglePartition();

Expand Down
58 changes: 31 additions & 27 deletions src/frontend/edu/brown/hstore/txns/DependencyTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -168,7 +167,7 @@ public void clear() {
}

private final PartitionExecutor executor;
private final Map<LocalTransaction, TransactionState> txnStates = new IdentityHashMap<>();
private final Map<Long, TransactionState> txnStates = new HashMap<Long, TransactionState>();

// ----------------------------------------------------------------------------
// INITIALIZATION
Expand All @@ -179,26 +178,39 @@ public DependencyTracker(PartitionExecutor executor) {
}

public void addTransaction(LocalTransaction ts) {
if (this.txnStates.containsKey(ts.getTransactionId())) {
return;
}

// FIXME
TransactionState state = new TransactionState(ts);
this.txnStates.put(ts, state);
this.txnStates.put(ts.getTransactionId(), state);
if (debug.val)
LOG.debug(String.format("Added %s to %s", ts, this));
}

private TransactionState getState(LocalTransaction ts) {
TransactionState state = this.txnStates.get(ts.getTransactionId());
assert(state != null) :
String.format("Unexpected null %s handle for %s at %s",
TransactionState.class.getSimpleName(), ts, this);
return (state);
}

public void removeTransaction(LocalTransaction ts) {
// FIXME
this.txnStates.remove(ts);
TransactionState state = this.txnStates.remove(ts.getTransactionId());
if (debug.val && state != null) {
LOG.debug(String.format("Removed %s from %s", ts, this));
}
}

// ----------------------------------------------------------------------------
// EXECUTION ROUNDS
// ----------------------------------------------------------------------------

protected void initRound(LocalTransaction ts) {
TransactionState state = this.txnStates.get(ts);
assert(state != null) :
String.format("Unexpected null %s handle for %s",
TransactionState.class.getSimpleName(), ts);

final TransactionState state = this.getState(ts);
assert(state.queued_results.isEmpty()) :
String.format("Trying to initialize ROUND #%d for %s but there are %d queued results",
ts.getCurrentRound(ts.getBasePartition()),
Expand All @@ -209,11 +221,7 @@ protected void initRound(LocalTransaction ts) {
}

protected void startRound(LocalTransaction ts) {
TransactionState state = this.txnStates.get(ts);
assert(state != null) :
String.format("Unexpected null %s handle for %s",
TransactionState.class.getSimpleName(), ts);

final TransactionState state = this.getState(ts);
final int basePartition = ts.getBasePartition();
final int currentRound = ts.getCurrentRound(basePartition);
final int batch_size = ts.getCurrentBatchSize();
Expand Down Expand Up @@ -257,11 +265,7 @@ protected void startRound(LocalTransaction ts) {
}

protected void finishRound(LocalTransaction ts) {
TransactionState state = this.txnStates.get(ts);
assert(state != null) :
String.format("Unexpected null %s handle for %s",
TransactionState.class.getSimpleName(), ts);

final TransactionState state = this.getState(ts);
assert(state.dependency_ctr == state.received_ctr) :
String.format("Trying to finish ROUND #%d on partition %d for %s before it was started",
ts.getCurrentRound(ts.getBasePartition()),
Expand All @@ -286,14 +290,6 @@ protected void finishRound(LocalTransaction ts) {
// INTERNAL METHODS
// ----------------------------------------------------------------------------

private TransactionState getState(LocalTransaction ts) {
TransactionState state = this.txnStates.get(ts);
assert(state != null) :
String.format("Unexpected null %s handle for %s",
TransactionState.class.getSimpleName(), ts);
return (state);
}

/**
*
* @param state
Expand Down Expand Up @@ -803,6 +799,14 @@ public boolean isBlocked(LocalTransaction ts, WorkFragment.Builder ftask) {
// DEBUG STUFF
// ----------------------------------------------------------------------------

@Override
public String toString() {
return String.format("%s{Partition=%02d / Hash=%d}",
this.getClass().getSimpleName(),
this.executor.getPartitionId(),
this.hashCode());
}

public class Debug implements DebugContext {
public DependencyInfo getDependencyInfo(LocalTransaction ts, int d_id) {
final TransactionState state = getState(ts);
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/edu/brown/hstore/txns/LocalTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ public void initRound(int partition, long undoToken) {
this, this.round_ctr[partition], partition, undoToken));

// SAME SITE, SAME PARTITION
if (this.base_partition == partition) {
if (this.base_partition == partition && this.predict_singlePartition == false) {
this.state.depTracker.initRound(this);
}

Expand All @@ -472,7 +472,7 @@ public void startRound(int partition) {

if (this.predict_singlePartition == false) this.lock.lock();
try {
this.state.depTracker.startRound(this);
if (this.predict_singlePartition == false) this.state.depTracker.startRound(this);
// It's now safe to change our state to STARTED
super.startRound(partition);
} finally {
Expand All @@ -497,7 +497,7 @@ public void finishRound(int partition) {
// SAME SITE, SAME PARTITION
if (this.predict_singlePartition == false) this.lock.lock();
try {
this.state.depTracker.finishRound(this);
if (this.predict_singlePartition == false) this.state.depTracker.finishRound(this);
super.finishRound(partition);
} finally {
if (this.predict_singlePartition == false) this.lock.unlock();
Expand All @@ -516,7 +516,7 @@ public void fastFinishRound(int partition) {
super.finishRound(partition);
if (this.base_partition == partition) {
assert(this.state != null) : "Unexpected null ExecutionState for " + this;
this.state.depTracker.finishRound(this);
if (this.predict_singlePartition == false) this.state.depTracker.finishRound(this);
}
}

Expand All @@ -535,7 +535,7 @@ public void setPendingError(SerializableException error, boolean interruptThread
interruptThread = (this.pending_error == null && interruptThread);
super.setPendingError(error);
if (interruptThread == false) return;
this.state.depTracker.unblock(this);
if (this.predict_singlePartition == false) this.state.depTracker.unblock(this);
}

@Override
Expand Down

0 comments on commit e737233

Please sign in to comment.