Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
apurtell committed Jun 16, 2021
1 parent 28bd66d commit 741c5f4
Showing 1 changed file with 60 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -64,7 +65,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -6811,49 +6814,41 @@ public String toString() {
public static class RowCommitSequencer {

final AtomicLong sequence = new AtomicLong(EnvironmentEdgeManager.currentTime());
final ConcurrentSkipListSet<HashedBytes> sequenceRowSet = new ConcurrentSkipListSet<>();
final ReentrantLock sequenceRowSetLock = new ReentrantLock();
final AtomicReference<HashSet<HashedBytes>> sequenceRowSet =
new AtomicReference<>(new LinkedHashSet<>());
final ReentrantLock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
final LongAdder yieldCount = new LongAdder();

long updateCurrentTime(final long now) {
return sequence.updateAndGet(x -> {
// Clock might go backwards, deliberately so in some unit tests.
// We only care that there is a unique CSLS for each discrete tick.
boolean updateCurrentTime(final long now) {
final boolean changed[] = { false };
sequence.updateAndGet(x -> {
if (now != x) {
// When the clock ticks, we begin a new commit sequence.
sequenceRowSet.clear();
changed[0] = true;
}
return now;
});
return changed[0];
}

boolean checkForOverlap(Collection<RowLock> rowLocks) throws IOException {
boolean checkAndAddRows(Collection<RowLock> rowLocks) throws IOException {
boolean overlap = false;
// For each row, test if the set already contains the row. If there is no mutation
// and the current operation will be allowed to go forward, then add all of its rows
// to the set.
// Unfortunately we need the set key tests and additions to be atomic in the aggregate.
try {
sequenceRowSetLock.lockInterruptibly();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
Set<HashedBytes> rowSet = sequenceRowSet.get();
for (RowLock l: rowLocks) {
HashedBytes row = ((RowLockImpl)l).context.row;
if (rowSet.contains(row)) {
overlap = true;
break;
}
}
try {
if (!overlap) {
for (RowLock l: rowLocks) {
HashedBytes row = ((RowLockImpl)l).context.row;
if (sequenceRowSet.contains(row)) {
overlap = true;
break;
}
rowSet.add(row);
}
if (!overlap) {
for (RowLock l: rowLocks) {
HashedBytes row = ((RowLockImpl)l).context.row;
sequenceRowSet.add(row);
}
}
} finally {
sequenceRowSetLock.unlock();
}
return overlap;
}
Expand All @@ -6863,34 +6858,46 @@ boolean checkForOverlap(Collection<RowLock> rowLocks) throws IOException {
* @return the time to use for the pending batch mutation
*/
public long getRowSequence(List<RowLock> rowLocks) throws IOException {
// When the clock ticks, we begin a new commit sequence. Do this housekeeping first.
long last = updateCurrentTime(EnvironmentEdgeManager.currentTime());
// Now we can check for collisions.
if (!checkForOverlap(rowLocks)) {
// No collision detected, proceed.
return sequence.get();
}
// Collision detected, now we wait.
boolean overlap = true;
do {
yieldCount.increment();
try {
// Yield the thread with a small sleep.
// Actual time suspended will depend on platform. Not less than the resolution of the
// system clock, which is what we want.
Thread.sleep(1,0);
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
while (true) {
boolean locked = false;
// When the clock ticks, we begin a new commit sequence. Do this housekeeping first.
long now = EnvironmentEdgeManager.currentTime();
if (updateCurrentTime(now)) {
sequenceRowSet.set(new LinkedHashSet<>());
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
condition.signalAll();
locked = true;
}
// Don't check again until the time changes, it would be useless overhead.
long now = updateCurrentTime(EnvironmentEdgeManager.currentTime());
if (last != now) {
overlap = checkForOverlap(rowLocks);
last = now;
if (!locked) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
}
} while (overlap);
// No collision detected, proceed
return sequence.get();
try {
// Now we can check for collisions.
if (!checkAndAddRows(rowLocks)) {
// No collision detected, proceed.
return sequence.get();
}
// Collision, get in line.
yieldCount.increment();
try {
if (!condition.await(100, TimeUnit.MICROSECONDS)) {
continue;
}
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
} finally {
lock.unlock();
}
}
}

/**
Expand Down

0 comments on commit 741c5f4

Please sign in to comment.