diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java index b3a556e3d9f2..86690588b81a 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java @@ -38,6 +38,7 @@ public interface MetricsRegionSource extends Comparable { String NUM_FILES_COMPACTED_COUNT = "numFilesCompactedCount"; String FLUSHES_QUEUED_COUNT = "flushesQueuedCount"; String MAX_FLUSH_QUEUE_SIZE = "maxFlushQueueSize"; + String ROW_SEQUENCING_YIELDS = "rowSequencingYields"; String COMPACTIONS_COMPLETED_DESC = "Number of compactions that have completed."; String COMPACTIONS_FAILED_DESC = "Number of compactions that have failed."; String LAST_MAJOR_COMPACTION_DESC = "Age of the last major compaction in milliseconds."; @@ -57,6 +58,7 @@ public interface MetricsRegionSource extends Comparable { String ROW_READS_ONLY_ON_MEMSTORE_DESC = "Row reads happening completely out of memstore"; String MIXED_ROW_READS = "mixedRowReadsCount"; String MIXED_ROW_READS_ON_STORE_DESC = "Row reads happening out of files and memstore on store"; + String ROW_SEQUENCING_YIELDS_DESC = "Number of yields taken to sequence row commits"; /** * Close the region's metrics as this region is closing. @@ -99,5 +101,9 @@ public interface MetricsRegionSource extends Comparable { */ MetricsRegionAggregateSource getAggregateSource(); + /** + * Update count of row sequencing yields. + */ + void updateRowSequencingYields(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java index 2f7f8074c9df..b3793d8dcd6b 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java @@ -57,7 +57,7 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private final String regionIncrementKey; private final String regionAppendKey; private final String regionScanKey; - + private final String regionRowSequencerYieldsKey; /* * Implementation note: Do not put histograms per region. With hundreds of regions in a server * histograms allocate too many counters. See HBASE-17016. @@ -69,6 +69,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private final MutableFastCounter regionGet; private final MutableFastCounter regionScan; + private final MutableFastCounter regionRowSequencerYields; + private final int hashCode; public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper, @@ -107,6 +109,10 @@ public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper, regionScanKey = regionNamePrefix + MetricsRegionServerSource.SCAN_KEY + suffix; regionScan = registry.getCounter(regionScanKey, 0L); + + regionRowSequencerYieldsKey = regionNamePrefix + MetricsRegionSource.ROW_SEQUENCING_YIELDS + + suffix; + regionRowSequencerYields = registry.getCounter(regionRowSequencerYieldsKey, 0L); } @Override @@ -135,6 +141,7 @@ public void close() { registry.removeMetric(regionAppendKey); registry.removeMetric(regionGetKey); registry.removeMetric(regionScanKey); + registry.removeMetric(regionRowSequencerYieldsKey); regionWrapper = null; } @@ -170,6 +177,11 @@ public void updateAppend() { regionAppend.incr(); } + @Override + public void updateRowSequencingYields() { + regionRowSequencerYields.incr(); + } + @Override public MetricsRegionAggregateSource getAggregateSource() { return agg; @@ -302,6 +314,10 @@ void snapshot(MetricsRecordBuilder mrb, boolean ignored) { regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE, MetricsRegionSource.MAX_FLUSH_QUEUE_DESC), this.regionWrapper.getMaxFlushQueueSize()); + mrb.addCounter(Interns.info( + regionNamePrefix + MetricsRegionSource.ROW_SEQUENCING_YIELDS, + MetricsRegionSource.ROW_SEQUENCING_YIELDS_DESC), + this.regionWrapper.getRowSequencingYields()); addCounter(mrb, this.regionWrapper.getMemstoreOnlyRowReadsCount(), MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE, MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java index 6bf010ce91b4..7de88f7e50b3 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java @@ -183,4 +183,8 @@ public interface MetricsRegionWrapper { */ Map getMixedRowReadsCount(); + /** + * @return the number of yields made for row sequencing + */ + long getRowSequencingYields(); } diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java index 598658a56ccc..0af7225ce01b 100644 --- a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java +++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java @@ -233,5 +233,10 @@ public Map getMixedRowReadsCount() { map.put("info", 0L); return map; } + + @Override + public long getRowSequencingYields() { + return 0; + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8afadc797dcc..ee1d1619ed22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -61,9 +62,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -223,6 +227,8 @@ */ @SuppressWarnings("deprecation") @InterfaceAudience.Private +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="JLM_JSR166_UTILCONCURRENT_MONITORENTER", + justification="Intentional") public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { private static final Logger LOG = LoggerFactory.getLogger(HRegion.class); @@ -689,6 +695,8 @@ void sawNoSuchFamily() { private final MultiVersionConcurrencyControl mvcc; + private final RowCommitSequencer rowCommitSequencer; + // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -770,6 +778,10 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor this.baseConf = confParam; this.conf = new CompoundConfiguration().add(confParam).addBytesMap(htd.getValues()); + + // rowCommitSequencer depends on this.conf + this.rowCommitSequencer = new RowCommitSequencer(); + this.cellComparator = htd.isMetaTable() || conf.getBoolean(USE_META_CELL_COMPARATOR, DEFAULT_USE_META_CELL_COMPARATOR) ? MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR; @@ -1559,6 +1571,9 @@ public Map> close() throws IOException { public static final String CLOSE_WAIT_INTERVAL = "hbase.regionserver.close.wait.interval.ms"; public static final long DEFAULT_CLOSE_WAIT_INTERVAL = 10000; // 10 seconds + public static final String COMMIT_SEQUENCER_ENABLED_KEY = "hbase.hregion.commit.sequencer.enabled"; + public static final boolean COMMIT_SEQUENCER_ENABLED_DEFAULT = true; + public Map> close(boolean abort) throws IOException { return close(abort, false); } @@ -4455,6 +4470,9 @@ protected void checkAndPreparePut(Put p) throws IOException { @Override public void checkAndPrepare() throws IOException { + // TODO: Currently validation is done with current time before acquiring locks and + // updates are done with different timestamps after acquiring locks. This behavior is + // inherited from the code prior to this change. Can this be changed? long now = EnvironmentEdgeManager.currentTime(); visitBatchOperations(true, this.size(), (int index) -> { checkAndPrepareMutation(index, now); @@ -4619,6 +4637,12 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { return; } + // Use the row commit sequencer to ensure that only operations that mutate disjoint + // sets of rows are committed within the same clock tick. + // Do this before we take the updatesLock because the sequencer may decide the operation + // will yield. + long now = rowCommitSequencer.getRowSequence(acquiredRowLocks); + // Check for thread interrupt status in case we have been signaled from // #interruptRegionOperation. Do it before we take the lock and disable interrupts for // the WAL append. @@ -4627,15 +4651,15 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount()); locked = true; - // From this point until memstore update this operation should not be interrupted. - disableInterrupts(); - // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer // timestamp - long now = EnvironmentEdgeManager.currentTime(); + // From this point until memstore update this operation should not be interrupted. + disableInterrupts(); + + // Prepare the batch, making any timestamp substitutions as needed batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit @@ -4892,11 +4916,14 @@ private CheckAndMutateResult checkAndMutateInternal(CheckAndMutate checkAndMutat // If matches, perform the mutation or the rowMutations if (matches) { - // We have acquired the row lock already. If the system clock is NOT monotonically - // non-decreasing (see HBASE-14070) we should make sure that the mutation has a - // larger timestamp than what was observed via Get. doBatchMutate already does this, but - // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); + + // Use the row commit sequencer to ensure that only operations that mutate disjoint + // sets of rows are committed within the same clock tick. + // Even if we yield it is safe to do this conditionally. The thread will yield but + // the row will remain locked. It will not be possible for any other thread to update + // the value. We don't need to re-read. + long now = rowCommitSequencer.getRowSequence(rowLock); + long ts = Math.max(now, cellTs); // ensure write is not eclipsed byte[] byteTs = Bytes.toBytes(ts); if (mutation != null) { @@ -6792,6 +6819,210 @@ public String toString() { } } + /** + * Sequences the commit of rows such that more than one mutation to a given row will never be + * committed in the same clock tick. + *

+ * Callers will first acquire row locks for the row(s) the pending mutation will mutate. + * Then they will use RowCommitSequencer.getRowSequence to ensure that the set of rows about + * to be mutated are disjoint with respect to all other pending mutations in the current clock + * tick. If an overlap is found, getRowSequence will yield and loop until there is no longer + * an overlap and the caller's pending mutation can proceed. + *

+ * Note: This should all be REMOVED once we use a hybrid logical clock for timekeeping. + */ + public class RowCommitSequencer { + + public static final int ROW_SEQUENCER_SLEEP_TIME = 1; + + private class RowSet { + ReentrantLock lock; + // LinkedHashSet is O(1) insert and O(1) contains, this is important + LinkedHashSet set; + public RowSet() { + lock = new ReentrantLock(); + set = new LinkedHashSet<>(); + } + } + + private AtomicReference rowSet; + private AtomicLong sequence; + private LongAdder yieldCount; + private final boolean enabled; + + public RowCommitSequencer() { + this.enabled = conf.getBoolean(COMMIT_SEQUENCER_ENABLED_KEY, true); + if (this.enabled) { + this.sequence = new AtomicLong(EnvironmentEdgeManager.currentTime()); + this.rowSet = new AtomicReference<>(new RowSet()); + this.yieldCount = new LongAdder(); + } + } + + /** + * Update the current time and take the sequencer lock to prepare for row set updates. + * @param now the current time + */ + // Visible for testing + void updateTime(final long now) throws IOException { + sequence.updateAndGet(x -> { + if (x != now) { + // Time changed, reset the row set. + rowSet.set(new RowSet()); + } + return now; + }); + } + + /** + * Check if one or more of the rows we have acquired locks for would overlap with a commit + * made to a row in the same clock tick. + * @param rowLocks the list of rows locked for the current operation + * @return false if one or more rows overlap with an operation in progress, true otherwise + */ + // Visible for testing + boolean checkAndAddRows(Collection rowLocks) throws IOException { + // For each row, test if the set already contains the row. If there is no mutation + // and the current operation will be allowed to go forward, then add all of its rows + // to the set. + // This operation is going to be O(N*2) number of row locks, so the underlying set + // should have O(1) add and O(1) contains, like LinkedHashSet. + RowSet thisSet = rowSet.get(); + try { + thisSet.lock.lockInterruptibly(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + try { + for (RowLock l: rowLocks) { + HashedBytes row = ((RowLockImpl)l).context.row; + if (thisSet.set.contains(row)) { + return false; + } + } + for (RowLock l: rowLocks) { + HashedBytes row = ((RowLockImpl)l).context.row; + thisSet.set.add(row); + } + return true; + } finally { + thisSet.lock.unlock(); + } + } + + /** + * Check if the row we have acquired a lock for would overlap with a commit made in the same + * clock tick. + * @param lock the row locked for the current operation + * @return false if the row overlaps with an operation in progress, true otherwise + */ + // Visible for testing + boolean checkAndAddRow(RowLock lock) throws IOException { + RowSet thisSet = rowSet.get(); + try { + thisSet.lock.lockInterruptibly(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + try { + HashedBytes row = ((RowLockImpl)lock).context.row; + if (thisSet.set.contains(row)) { + return false; + } + thisSet.set.add(row); + return true; + } finally { + thisSet.lock.unlock(); + } + } + + /** + * Get the timestamp to use for substitution as cell timestamps for the current operation + * in progress. + *

+ * This method may yield the thread if one or more of the rows we have acquired locks for + * would overlap with a commit made to a row in the same clock tick, until the system time + * advances. + * @param rowLocks list of row locks accumulated for a batch mutation + * @return the timestamp to use for the current operation + */ + public long getRowSequence(List rowLocks) throws IOException { + while (true) { + long now = EnvironmentEdgeManager.currentTime(); + if (!enabled) { + return now; + } + updateTime(now); + // Now we can check for collisions. + if (checkAndAddRows(rowLocks)) { + // No collision detected, proceed. + return now; + } + try { + // The typical clock resolution on a modern system is ~1ms. Wait times less than + // this may be rounded up to at least the time for one clock tick on some platforms. + yieldCount.increment(); + Thread.sleep(ROW_SEQUENCER_SLEEP_TIME, 0); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + } + + /** + * Get the timestamp to use for substitution as cell timestamps for the current operation + * in progress. + *

+ * This method may block if one or more of the rows we have acquired locks for would + * overlap with a commit made to a row in the same clock tick, until the system time + * advances. + * @param rowLock row lock + * @return the timestamp to use for the current operation + */ + public long getRowSequence(RowLock rowLock) throws IOException { + while (true) { + long now = EnvironmentEdgeManager.currentTime(); + if (!enabled) { + return now; + } + updateTime(now); + // Now we can check for collisions. + if (checkAndAddRow(rowLock)) { + // No collision detected, proceed. + return now; + } + try { + // The typical clock resolution on a modern system is ~1ms. Wait times less than + // this may be rounded up to at least the time for one clock tick on some platforms. + yieldCount.increment(); + Thread.sleep(1,0); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + } + + /** + * @return the number of times the row sequencer yielded the current threads + */ + public long getYieldCount() { + return yieldCount.sum(); + } + + } + + // Visible for testing + RowCommitSequencer getRowCommitSequencer() { + return this.rowCommitSequencer; + } + + /** + * @return the number of times the row sequencer yielded the current threads + */ + public long getRowSequencingYields() { + return rowCommitSequencer.getYieldCount(); + } + /** * Determines whether multiple column families are present * Precondition: familyPaths is not null @@ -7733,6 +7964,8 @@ public Result increment(Increment increment, long nonceGroup, long nonce) throws /** * @return writeEntry associated with this append */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", + justification="Findbugs doesn't know about Preconditions") private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException { Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), @@ -8433,4 +8666,5 @@ public void addReadRequestsCount(long readRequestsCount) { public void addWriteRequestsCount(long writeRequestsCount) { this.writeRequestsCount.add(writeRequestsCount); } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index 1a266f76abb6..f9e8b92abf88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -246,6 +246,11 @@ public Map getMixedRowReadsCount() { return mixedReadsOnStore; } + @Override + public long getRowSequencingYields() { + return region.getRowSequencingYields(); + } + public class HRegionMetricsWrapperRunnable implements Runnable { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java index 774871b38263..02db8f3687dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java @@ -29,7 +29,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Stable -public class HashedBytes { +public class HashedBytes implements Comparable { private final byte[] bytes; private final int hashCode; @@ -58,8 +58,14 @@ public boolean equals(Object obj) { return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes); } + @Override + public int compareTo(HashedBytes that) { + return Bytes.compareTo(this.bytes, that.bytes); + } + @Override public String toString() { return Bytes.toStringBinary(bytes); } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java index 2e40de3112c2..0e7752ba3140 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -80,6 +81,8 @@ public static void setupBeforeClass() throws Exception { // Make general delay zero rather than default. Timing is off in this // test that depends on an evironment edge that is manually moved forward. util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); + // Row commit sequencer won't work because this test uses ManualEnvironmentEdge + util.getConfiguration().setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); util.startMiniCluster(); EnvironmentEdgeManager.injectEdge(mee); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java index 99bf3fa05489..33511db27cc2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -90,6 +91,8 @@ public static void setupBeforeClass() throws Exception { // Make general delay zero rather than default. Timing is off in this // test that depends on an evironment edge that is manually moved forward. util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); + // Row commit sequencer won't work because this test uses ManualEnvironmentEdge + util.getConfiguration().setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); util.startMiniCluster(); EnvironmentEdgeManager.injectEdge(mee); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java index 4f40f6289cb3..52e37932dcd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java @@ -199,4 +199,9 @@ public Map getMixedRowReadsCount() { map.put("info", 0L); return map; } + + @Override + public long getRowSequencingYields() { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a5584ff32599..df083a871754 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -7032,9 +7032,9 @@ public void testCellTTLs() throws IOException { @Test public void testIncrementTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Increment inc = new Increment(row); @@ -7057,9 +7057,9 @@ public void testIncrementTimestampsAreMonotonic() throws IOException { @Test public void testAppendTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Append a = new Append(row); @@ -7088,29 +7088,29 @@ public void testAppendTimestampsAreMonotonic() throws IOException { @Test public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); - ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); - edge.setValue(10); + final long t0 = edge.currentTime(); Put p = new Put(row); p.setDurability(Durability.SKIP_WAL); - p.addColumn(fam1, qual1, qual1); + p.addColumn(fam1, qual1, t0, qual1); region.put(p); Result result = region.get(new Get(row)); Cell c = result.getColumnLatestCell(fam1, qual1); assertNotNull(c); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); - edge.setValue(1); // clock goes back + edge.incrementTime(-10000); // clock goes back p = new Put(row); p.setDurability(Durability.SKIP_WAL); p.addColumn(fam1, qual1, qual2); region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p); result = region.get(new Get(row)); c = result.getColumnLatestCell(fam1, qual1); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), qual2, 0, qual2.length)); @@ -7292,22 +7292,22 @@ public Void call() throws Exception { @Test public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); - ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); - edge.setValue(10); + long t0 = edge.currentTime(); Put p = new Put(row); p.setDurability(Durability.SKIP_WAL); - p.addColumn(fam1, qual1, qual1); + p.addColumn(fam1, qual1, t0, qual1); region.put(p); Result result = region.get(new Get(row)); Cell c = result.getColumnLatestCell(fam1, qual1); assertNotNull(c); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); - edge.setValue(1); // clock goes back + edge.incrementTime(-10000); // clock goes back p = new Put(row); p.setDurability(Durability.SKIP_WAL); p.addColumn(fam1, qual1, qual2); @@ -7317,7 +7317,7 @@ public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { new BinaryComparator(qual1), rm)); result = region.get(new Get(row)); c = result.getColumnLatestCell(fam1, qual1); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); LOG.info("c value " + Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength())); @@ -7879,4 +7879,185 @@ public void run() { assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get()); } + private boolean checkForOverlap(final byte[]... rows) throws IOException { + List list = new ArrayList<>(rows.length); + for (byte[] row: rows) { + list.add(region.getRowLock(row)); + } + try { + return region.getRowCommitSequencer().checkAndAddRows(list); + } finally { + for (RowLock l: list) { + l.release(); + } + } + } + + @Test + public void testRowCommitSequencer() throws Exception { + final byte[] cf1 = Bytes.toBytes("CF1"); + final byte[][] families = { cf1 }; + final byte[] AAA = Bytes.toBytes("AAA"); + final byte[] AAB = Bytes.toBytes("AAB"); + final byte[] AAC = Bytes.toBytes("AAC"); + final byte[] AAD = Bytes.toBytes("AAD"); + final byte[] AAE = Bytes.toBytes("AAE"); + final byte[] AAF = Bytes.toBytes("AAF"); + final byte[] AAG = Bytes.toBytes("AAG"); + + region = initHRegion(tableName, method, CONF, families); + HRegion.RowCommitSequencer sequencer = region.getRowCommitSequencer(); + + // Tick 1 + + sequencer.updateTime(1); + assertTrue (checkForOverlap(AAA, AAB, AAC)); + assertFalse(checkForOverlap(AAA)); + + // Tick 2 + + sequencer.updateTime(2); + assertTrue (checkForOverlap(AAA)); + + // Tick 3 + + sequencer.updateTime(3); + // Two disjoint batches allowed + assertTrue (checkForOverlap(AAA, AAB, AAC)); + assertTrue (checkForOverlap(AAD, AAE, AAF)); + // Single row mutations that conflict with the batches are disallowed + assertFalse(checkForOverlap(AAA)); + assertFalse(checkForOverlap(AAB)); + assertFalse(checkForOverlap(AAC)); + assertFalse(checkForOverlap(AAD)); + assertFalse(checkForOverlap(AAE)); + assertFalse(checkForOverlap(AAF)); + // Single row mutation that does not conflict is fine + assertTrue (checkForOverlap(AAG)); + + // Tick 4 + + sequencer.updateTime(4); + // Two single row mutations allowed + assertTrue (checkForOverlap(AAA)); + assertTrue (checkForOverlap(AAD)); + // A batch with a conflicting row disallowed + assertFalse(checkForOverlap(AAA, AAB, AAC)); + // Single row mutations in the batch that do not conflict are allowed on their own + // This tests that the check does not add rows which are not going to pass the filter. + assertTrue (checkForOverlap(AAB)); + assertTrue (checkForOverlap(AAC)); + // Another batch with a conflicting row disallowed + assertFalse(checkForOverlap(AAD, AAE, AAF)); + // More row mutations in the batch that do not conflict are allowed on their own + // This tests that the check does not add rows which are not going to pass the filter. + assertTrue (checkForOverlap(AAE)); + assertTrue (checkForOverlap(AAF)); + + // Time jumps forward + // Tick 10 + + sequencer.updateTime(10); + // A batch with a previosuly conflicting row now allowed + assertTrue (checkForOverlap(AAA, AAB, AAC)); + + // Time jumps backwards + // Tick 7 + + sequencer.updateTime(7); + // Same check + assertTrue (checkForOverlap(AAA, AAB, AAC)); + + // Time jumps backwards again + // Tick 6 + + sequencer.updateTime(6); + // Same check + assertTrue (checkForOverlap(AAA, AAB, AAC)); + + // Tick 11 + + // Some interleaving + sequencer.updateTime(11); + assertTrue (checkForOverlap(AAA, AAB)); + assertFalse(checkForOverlap(AAB, AAC)); + assertTrue (checkForOverlap(AAC, AAD)); + assertFalse(checkForOverlap(AAD, AAE)); + assertTrue (checkForOverlap(AAE, AAF)); + assertFalse(checkForOverlap(AAF, AAG)); + assertTrue (checkForOverlap(AAG)); + + // Tick 12 + + sequencer.updateTime(12); + assertTrue (checkForOverlap(AAA, AAB, AAC, AAD, AAE, AAF, AAG)); + assertFalse(checkForOverlap(AAA)); + assertFalse(checkForOverlap(AAB)); + assertFalse(checkForOverlap(AAC)); + assertFalse(checkForOverlap(AAD)); + assertFalse(checkForOverlap(AAE)); + assertFalse(checkForOverlap(AAF)); + assertFalse(checkForOverlap(AAG)); + + // Tick 13 + + sequencer.updateTime(13); + assertTrue (checkForOverlap(AAA)); + assertTrue (checkForOverlap(AAB)); + assertTrue (checkForOverlap(AAC)); + assertTrue (checkForOverlap(AAD)); + assertTrue (checkForOverlap(AAE)); + assertTrue (checkForOverlap(AAF)); + assertTrue (checkForOverlap(AAG)); + } + + @Test + public void testRowCommitSequencerYields() throws Exception { + final byte[] cf1 = Bytes.toBytes("CF1"); + final byte[][] families = { cf1 }; + final byte[] empty = HConstants.EMPTY_BYTE_ARRAY; + final Put putAAA = new Put(Bytes.toBytes("AAA")).addColumn(cf1, empty, Bytes.toBytes(0)); + ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); + mee.setValue(1); + EnvironmentEdgeManager.injectEdge(mee); + try { + region = initHRegion(tableName, method, CONF, families); + Thread ticker = new Thread(() -> { + int i = 2; + while (true) { + i += 1; + LOG.info("Set time to {}", i); + mee.setValue(i); + try { + Thread.sleep(HRegion.RowCommitSequencer.ROW_SEQUENCER_SLEEP_TIME * 10); + } catch (InterruptedException e) { + LOG.info("Interrupted while ticking"); + return; + } + } + }); + Thread committer = new Thread(() -> { + for (int i = 0; i < 100; i++) { + try { + region.put(putAAA); + LOG.info("Put {}", putAAA); + } catch (IOException e) { + LOG.error("Error during put", e); + } + } + }); + ticker.start(); + committer.start(); + committer.join(); + ticker.interrupt(); + ticker.join(); + } finally { + EnvironmentEdgeManager.reset(); + } + + LOG.info("rowSequencingYields: {}", region.getRowSequencingYields()); + // There should be at least 99 yields (there will probably be many more...) + assertTrue(region.getRowSequencingYields() >= 99); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java index 089397313fec..390171721a2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -61,7 +62,8 @@ public class TestMinVersions { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMinVersions.class); - HBaseTestingUtil hbu = new HBaseTestingUtil(); + private static final HBaseTestingUtil hbu = new HBaseTestingUtil(); + private final byte[] T0 = Bytes.toBytes("0"); private final byte[] T1 = Bytes.toBytes("1"); private final byte[] T2 = Bytes.toBytes("2"); @@ -73,6 +75,12 @@ public class TestMinVersions { @Rule public TestName name = new TestName(); + @BeforeClass + public static void setUpBeforeClass() { + // Row commit sequencer won't work because this test uses ManualEnvironmentEdge + hbu.getConfiguration().setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); + } + /** * Verify behavior of getClosestBefore(...) */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index d5c31236d117..c58c6810058b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.ScanType; @@ -89,6 +90,8 @@ public class TestCoprocessorScanPolicy { public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); + // Row commit sequencer won't work because the test uses the ManualEnvironmentEdge + conf.setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); TEST_UTIL.startMiniCluster(); }