From b02e805574ebc62b355c3ff1aa91d8a59e084158 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Wed, 23 Feb 2022 22:14:35 +0800 Subject: [PATCH 01/13] HBASE-26768 Avoid unnecessary replication suspending in RegionReplicationSink --- .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../RegionReplicationFlushRequester.java | 15 +- .../RegionReplicationSink.java | 59 ++- .../TestRegionReplicationSinkSuspend.java | 422 ++++++++++++++++++ 4 files changed, 488 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1acbf162b098..9fb20776eaa5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -582,7 +582,7 @@ public Result getResult() { } /** A result object from prepare flush cache stage */ - static class PrepareFlushResult { + protected static class PrepareFlushResult { final FlushResultImpl result; // indicating a failure result from prepare final TreeMap storeFlushCtxs; final TreeMap> committedFiles; @@ -724,7 +724,7 @@ void sawNoSuchFamily() { private final StoreHotnessProtector storeHotnessProtector; - private Optional regionReplicationSink = Optional.empty(); + protected Optional regionReplicationSink = Optional.empty(); /** * HRegion constructor. This constructor should only be used for testing and diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java index 34313241d1f6..c1fad5a9deb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.regionreplication; +import com.google.errorprone.annotations.RestrictedApi; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Threads; @@ -56,7 +57,7 @@ class RegionReplicationFlushRequester { private final long minIntervalSecs; - private long lastRequestNanos; + private long lastRequestNanos = 0; private long pendingFlushRequestSequenceId; @@ -140,4 +141,16 @@ synchronized void recordFlush(long sequenceId) { pendingFlushRequest = null; } } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + synchronized Timeout getPendingFlushRequest() { + return this.pendingFlushRequest; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + synchronized long getLastRequestNanos() { + return this.lastRequestNanos; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index 265d768e211e..22e685afc65c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.regionreplication; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -98,7 +99,7 @@ public class RegionReplicationSink { public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100; - private static final class SinkEntry { + static final class SinkEntry { final WALKeyImpl key; @@ -170,7 +171,7 @@ void replicated() { private volatile long pendingSize; - private long lastFlushedSequenceId; + private volatile long lastFlushedSequenceId; private boolean sending; @@ -215,31 +216,40 @@ private void onComplete(List sent, } manager.decrease(toReleaseSize); Set failed = new HashSet<>(); + long lastFlushedSequenceIdToUse = this.lastFlushedSequenceId; for (Map.Entry> entry : replica2Error.entrySet()) { Integer replicaId = entry.getKey(); Throwable error = entry.getValue().getValue(); if (error != null) { - if (maxSequenceId > lastFlushedSequenceId) { + if (maxSequenceId > lastFlushedSequenceIdToUse) { LOG.warn( "Failed to replicate to secondary replica {} for {}, since the max sequence" + " id of sunk entris is {}, which is greater than the last flush SN {}," + " we will stop replicating for a while and trigger a flush", - replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); + replicaId, primary, maxSequenceId, lastFlushedSequenceIdToUse, error); failed.add(replicaId); } else { LOG.warn( "Failed to replicate to secondary replica {} for {}, since the max sequence" + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + " we will not stop replicating", - replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); + replicaId, primary, maxSequenceId, lastFlushedSequenceIdToUse, error); } } } + checkFailedReplicaAndSend(failed, toReleaseSize, maxSequenceId); + } + + protected void checkFailedReplicaAndSend(Set failed, long toReleaseSize, + long maxSequenceId) { synchronized (entries) { pendingSize -= toReleaseSize; - if (!failed.isEmpty()) { - failedReplicas.addAll(failed); - flushRequester.requestFlush(maxSequenceId); + // double check + if (maxSequenceId > lastFlushedSequenceId) { + if (!failed.isEmpty()) { + failedReplicas.addAll(failed); + flushRequester.requestFlush(maxSequenceId); + } } sending = false; if (stopping) { @@ -253,7 +263,7 @@ private void onComplete(List sent, } } - private void send() { + void send() { List toSend = new ArrayList<>(); long totalSize = 0L; boolean hasMetaEdit = false; @@ -448,4 +458,35 @@ public void waitUntilStopped() throws InterruptedException { } } } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + RegionReplicationFlushRequester getFlushRequester() { + return this.flushRequester; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + IntHashSet getFailedReplicas() { + synchronized (entries) { + return this.failedReplicas; + } + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + boolean isSending() { + synchronized (entries) { + return this.sending; + } + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + Queue getEntries() { + synchronized (entries) { + return this.entries; + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java new file mode 100644 index 000000000000..4684d8b55404 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.regionreplication; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.StartTestingClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink.SinkEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRegionReplicationSinkSuspend { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionReplicationSinkSuspend.class); + + private static final byte[] FAMILY = Bytes.toBytes("family_test"); + + private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); + + private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); + private static final int NB_SERVERS = 2; + + private static TableName tableName = TableName.valueOf("testRegionReplicationSinkSuspend"); + private static volatile boolean startTest = false; + private static volatile boolean initialFlush = false; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); + conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 15); + conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); + conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); + conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); + conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); + conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); + HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class) + .numRegionServers(NB_SERVERS).build()); + + } + + @AfterClass + public static void tearDown() throws Exception { + HTU.shutdownMiniCluster(); + } + + /** + * This test is for HBASE-26768 + */ + @Test + public void test() throws Exception { + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setRegionReplication(NB_SERVERS).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setRegionMemStoreReplication(true).build(); + HTU.getAdmin().createTable(tableDescriptor); + final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; + for (int i = 0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getRegions(tableName); + for (HRegion region : onlineRegions) { + int replicaId = region.getRegionInfo().getReplicaId(); + assertTrue(regions[replicaId] == null); + regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; + } + } + for (Region region : regions) { + assertNotNull(region); + } + + final AtomicInteger sinkCheckFailedReplicaAndSendCounter = new AtomicInteger(0); + RegionReplicationSink regionReplicationSink = + regions[0].getRegionReplicationSink().get(); + assertTrue(regionReplicationSink != null); + RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); + AtomicReference catchedThrowableRef = new AtomicReference(null); + Mockito.doAnswer((invocationOnMock) -> { + try { + if (!startTest) { + invocationOnMock.callRealMethod(); + return null; + } + int count = sinkCheckFailedReplicaAndSendCounter.incrementAndGet(); + if (count == 1) { + RegionReplicationSink currentSink = (RegionReplicationSink) invocationOnMock.getMock(); + assertTrue(currentSink.getFailedReplicas().size() == 0); + @SuppressWarnings("unchecked") + Set inputFailedReplicas = (Set) invocationOnMock.getArgument(0); + assertTrue(inputFailedReplicas.size() == 1); + /** + * Wait {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} starting, + * Only before entering {@link RegionReplicationSink#checkFailedReplicaAndSend}, + * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} could starting. + */ + regions[0].cyclicBarrier.await(); + /** + * Wait {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending. + * After {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending, + * {@link RegionReplicationSink#checkFailedReplicaAndSend} could enter its method to + * execute. + */ + regions[0].cyclicBarrier.await(); + assertTrue(currentSink.getFailedReplicas().isEmpty()); + + invocationOnMock.callRealMethod(); + assertTrue(currentSink.getFailedReplicas().isEmpty()); + assertTrue(currentSink.getFlushRequester().getPendingFlushRequest() == null); + /** + * After {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing + * could commit flush + */ + regions[0].cyclicBarrier.await(); + return null; + } + invocationOnMock.callRealMethod(); + return null; + } catch (Throwable throwable) { + catchedThrowableRef.set(throwable); + } + return null; + }).when(spiedRegionReplicationSink).checkFailedReplicaAndSend( + Mockito.anySet(), + Mockito.anyLong(), + Mockito.anyLong()); + + Mockito.doAnswer((invocationOnMock) -> { + if (!startTest) { + invocationOnMock.callRealMethod(); + return null; + } + if (regions[0].prepareFlush + && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME)) { + /** + * Only before entering {@link RegionReplicationSink#checkFailedReplicaAndSend}, + * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} could starting. + */ + regions[0].cyclicBarrier.await(); + invocationOnMock.callRealMethod(); + /** + * After {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending, + * {@link RegionReplicationSink#checkFailedReplicaAndSend} could enter its method to + * execute. + */ + regions[0].cyclicBarrier.await(); + /** + * Wait {@link RegionReplicationSink#checkFailedReplicaAndSend} ending. After + * {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing could + * commit flush. + */ + regions[0].cyclicBarrier.await(); + return null; + } + invocationOnMock.callRealMethod(); + return null; + }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any()); + + final AtomicInteger sinkSendCounter = new AtomicInteger(0); + Mockito.doAnswer((invocationOnMock) -> { + if (startTest) { + sinkSendCounter.incrementAndGet(); + } + invocationOnMock.callRealMethod(); + return null; + }).when(spiedRegionReplicationSink).send(); + + regions[0].setRegionReplicationSink(spiedRegionReplicationSink); + + String oldThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); + try { + initialFlush = true; + spiedRegionReplicationSink.getFlushRequester().requestFlush(-1); + regions[0].flushMemStoreCyclicBarrierBeforeStartTest.await(); + HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); + assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); + long initialRequestNanos = + spiedRegionReplicationSink.getFlushRequester().getLastRequestNanos(); + assertTrue(initialRequestNanos > 0); + initialFlush = false; + startTest = true; + /** + * Write First cell,replicating to secondary replica is error. + * {@link RegionReplicationSink#checkFailedReplicaAndSend} for this cell would wait for + * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending. + */ + regions[0].put(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + + /** + * For {@link FlushAction#START_FLUSH}, after {@link RegionReplicationSink#add} is + * ending,{@link RegionReplicationSink#checkFailedReplicaAndSend} could execute, + * {@link FlushAction#START_FLUSH} is in {@link RegionReplicationSink#entries},and is sent by + * {@link RegionReplicationSink#checkFailedReplicaAndSend},After + * {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing could + * commit flush.
+ * For {@link FlushAction#COMMIT_FLUSH},it is directly sent. + */ + regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY); + + HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); + /** + * No failed replicas. + */ + assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty()); + /** + * No flush requester. + */ + assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); + assertTrue(spiedRegionReplicationSink.getFlushRequester() + .getLastRequestNanos() == initialRequestNanos); + Queue sinkEntrys = spiedRegionReplicationSink.getEntries(); + assertTrue(sinkEntrys.size() == 0); + assertTrue(!spiedRegionReplicationSink.isSending()); + /** + * send key1,{@link FlushAction#START_FLUSH} and {@link FlushAction#COMMIT_FLUSH} + */ + assertTrue(sinkSendCounter.get() == 3); + + regions[0].put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); + + HTU.waitFor(1200000, () -> !spiedRegionReplicationSink.isSending()); + assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); + assertTrue(spiedRegionReplicationSink.getFlushRequester() + .getLastRequestNanos() == initialRequestNanos); + assertTrue(sinkSendCounter.get() == 4); + assertTrue(sinkEntrys.size() == 0); + assertTrue(catchedThrowableRef.get() == null); + } finally { + startTest = false; + Thread.currentThread().setName(oldThreadName); + } + } + + + public static final class HRegionForTest extends HRegion { + static final String USER_THREAD_NAME = "TestReplicationHang"; + final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + volatile boolean prepareFlush = false; + final CyclicBarrier flushMemStoreCyclicBarrierBeforeStartTest = new CyclicBarrier(2); + + public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, + TableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + @SuppressWarnings("deprecation") + public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { + this.regionReplicationSink = Optional.of(regionReplicationSink); + } + + @Override + protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, + Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, + FlushLifeCycleTracker tracker) throws IOException { + + FlushResultImpl result = super.internalFlushcache(wal, myseqid, storesToFlush, status, + writeFlushWalMarker, tracker); + if (!startTest && this.getRegionInfo().getReplicaId() == 0 + && this.getRegionInfo().getTable().equals(tableName) && initialFlush) { + try { + this.flushMemStoreCyclicBarrierBeforeStartTest.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + return result; + } + + @Override + protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, + Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, + FlushLifeCycleTracker tracker) throws IOException { + if (!startTest) { + return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, + writeFlushWalMarker, tracker); + } + + if (this.getRegionInfo().getReplicaId() == 0 + && Thread.currentThread().getName().equals(USER_THREAD_NAME)) { + this.prepareFlush = true; + } + try { + return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, + writeFlushWalMarker, tracker); + } + finally { + if (this.getRegionInfo().getReplicaId() == 0 + && Thread.currentThread().getName().equals(USER_THREAD_NAME)) { + this.prepareFlush = false; + } + } + + } + } + + public static final class ErrorReplayRSRpcServices extends RSRpcServices { + private static final AtomicInteger callCounter = new AtomicInteger(0); + + public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, + ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException { + + if (!startTest) { + return super.replicateToReplica(rpcController, replicateWALEntryRequest); + } + + List entries = replicateWALEntryRequest.getEntryList(); + if (CollectionUtils.isEmpty(entries)) { + return ReplicateWALEntryResponse.getDefaultInstance(); + } + ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); + + HRegion region; + try { + region = server.getRegionByEncodedName(regionName.toStringUtf8()); + } catch (NotServingRegionException e) { + throw new ServiceException(e); + } + + if (!region.getRegionInfo().getTable().equals(tableName) + || region.getRegionInfo().getReplicaId() != 1) { + return super.replicateToReplica(rpcController, replicateWALEntryRequest); + } + + int count = callCounter.incrementAndGet(); + if (count > 1) { + return super.replicateToReplica(rpcController, replicateWALEntryRequest); + } + throw new ServiceException(new DoNotRetryIOException("Inject error!")); + } + } + + public static final class RSForTest + extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { + + public RSForTest(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new ErrorReplayRSRpcServices(this); + } + } +} From 10aa84f87c071458594038b698937445175030b8 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 24 Feb 2022 12:31:47 +0800 Subject: [PATCH 02/13] fix checkstyle --- .../TestRegionReplicationSinkSuspend.java | 218 ++++++++++-------- 1 file changed, 121 insertions(+), 97 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java index 4684d8b55404..0007ce160819 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.regionreplication; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.StartTestingClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -60,7 +62,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; @@ -69,6 +71,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -76,7 +79,7 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; -@Category({ RegionServerTests.class, MediumTests.class }) +@Category({ RegionServerTests.class, LargeTests.class }) public class TestRegionReplicationSinkSuspend { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -119,30 +122,97 @@ public static void tearDown() throws Exception { */ @Test public void test() throws Exception { - TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) - .setRegionReplication(NB_SERVERS).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) - .setRegionMemStoreReplication(true).build(); - HTU.getAdmin().createTable(tableDescriptor); - final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; - for (int i = 0; i < NB_SERVERS; i++) { - HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); - List onlineRegions = rs.getRegions(tableName); - for (HRegion region : onlineRegions) { - int replicaId = region.getRegionInfo().getReplicaId(); - assertTrue(regions[replicaId] == null); - regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; - } - } - for (Region region : regions) { - assertNotNull(region); - } - + final HRegionForTest[] regions = this.createTable(); final AtomicInteger sinkCheckFailedReplicaAndSendCounter = new AtomicInteger(0); + final AtomicInteger sinkSendCounter = new AtomicInteger(0); + AtomicReference catchedThrowableRef = new AtomicReference(null); + RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); assertTrue(regionReplicationSink != null); + RegionReplicationSink spiedRegionReplicationSink = + this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], + sinkCheckFailedReplicaAndSendCounter, sinkSendCounter, catchedThrowableRef); + + String oldThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); + try { + initialFlush = true; + spiedRegionReplicationSink.getFlushRequester().requestFlush(-1); + regions[0].flushMemStoreCyclicBarrierBeforeStartTest.await(); + HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); + assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); + long initialRequestNanos = + spiedRegionReplicationSink.getFlushRequester().getLastRequestNanos(); + assertTrue(initialRequestNanos > 0); + initialFlush = false; + startTest = true; + /** + * Write First cell,replicating to secondary replica is error. + * {@link RegionReplicationSink#checkFailedReplicaAndSend} for this cell would wait for + * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending. + */ + byte[] rowKey1 = Bytes.toBytes(1); + regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + /** + * For {@link FlushAction#START_FLUSH}, after {@link RegionReplicationSink#add} is + * ending,{@link RegionReplicationSink#checkFailedReplicaAndSend} could execute, + * {@link FlushAction#START_FLUSH} is in {@link RegionReplicationSink#entries},and is sent by + * {@link RegionReplicationSink#checkFailedReplicaAndSend},After + * {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing could + * commit flush.
+ * For {@link FlushAction#COMMIT_FLUSH},it is directly sent. + */ + regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY); + + HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); + /** + * No failed replicas. Before HBASE-26768, here failed replicas is not empty and replication + * suspend. + */ + assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty()); + /** + * No flush requester. + */ + assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); + assertTrue(spiedRegionReplicationSink.getFlushRequester() + .getLastRequestNanos() == initialRequestNanos); + Queue sinkEntrys = spiedRegionReplicationSink.getEntries(); + assertTrue(sinkEntrys.size() == 0); + assertTrue(!spiedRegionReplicationSink.isSending()); + /** + * send key1,{@link FlushAction#START_FLUSH} and {@link FlushAction#COMMIT_FLUSH} + */ + assertTrue(sinkSendCounter.get() == 3); + + byte[] rowKey2 = Bytes.toBytes(2); + regions[0].put(new Put(rowKey2).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); + + HTU.waitFor(1200000, () -> !spiedRegionReplicationSink.isSending()); + assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); + assertTrue(spiedRegionReplicationSink.getFlushRequester() + .getLastRequestNanos() == initialRequestNanos); + /** + * send key1,{@link FlushAction#START_FLUSH} , {@link FlushAction#COMMIT_FLUSH},and key2 + */ + assertTrue(sinkSendCounter.get() == 4); + assertTrue(sinkEntrys.size() == 0); + assertTrue(catchedThrowableRef.get() == null); + + assertEquals(1, Bytes.toInt(regions[1].get(new Get(rowKey1)).getValue(FAMILY, QUAL))); + assertEquals(2, Bytes.toInt(regions[1].get(new Get(rowKey2)).getValue(FAMILY, QUAL))); + } finally { + startTest = false; + Thread.currentThread().setName(oldThreadName); + } + } + + private RegionReplicationSink setUpSpiedRegionReplicationSink( + final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion, + final AtomicInteger sinkCheckFailedReplicaAndSendCounter, + final AtomicInteger sinkSendCounter, final AtomicReference catchedThrowableRef) { RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); - AtomicReference catchedThrowableRef = new AtomicReference(null); + Mockito.doAnswer((invocationOnMock) -> { try { if (!startTest) { @@ -161,14 +231,14 @@ public void test() throws Exception { * Only before entering {@link RegionReplicationSink#checkFailedReplicaAndSend}, * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} could starting. */ - regions[0].cyclicBarrier.await(); + primaryRegion.cyclicBarrier.await(); /** * Wait {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending. * After {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending, * {@link RegionReplicationSink#checkFailedReplicaAndSend} could enter its method to * execute. */ - regions[0].cyclicBarrier.await(); + primaryRegion.cyclicBarrier.await(); assertTrue(currentSink.getFailedReplicas().isEmpty()); invocationOnMock.callRealMethod(); @@ -178,7 +248,7 @@ public void test() throws Exception { * After {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing * could commit flush */ - regions[0].cyclicBarrier.await(); + primaryRegion.cyclicBarrier.await(); return null; } invocationOnMock.callRealMethod(); @@ -187,43 +257,40 @@ public void test() throws Exception { catchedThrowableRef.set(throwable); } return null; - }).when(spiedRegionReplicationSink).checkFailedReplicaAndSend( - Mockito.anySet(), - Mockito.anyLong(), - Mockito.anyLong()); + }).when(spiedRegionReplicationSink).checkFailedReplicaAndSend(Mockito.anySet(), + Mockito.anyLong(), Mockito.anyLong()); Mockito.doAnswer((invocationOnMock) -> { if (!startTest) { invocationOnMock.callRealMethod(); return null; } - if (regions[0].prepareFlush + if (primaryRegion.prepareFlush && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME)) { /** * Only before entering {@link RegionReplicationSink#checkFailedReplicaAndSend}, * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} could starting. */ - regions[0].cyclicBarrier.await(); + primaryRegion.cyclicBarrier.await(); invocationOnMock.callRealMethod(); /** * After {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending, * {@link RegionReplicationSink#checkFailedReplicaAndSend} could enter its method to * execute. */ - regions[0].cyclicBarrier.await(); + primaryRegion.cyclicBarrier.await(); /** * Wait {@link RegionReplicationSink#checkFailedReplicaAndSend} ending. After * {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing could * commit flush. */ - regions[0].cyclicBarrier.await(); + primaryRegion.cyclicBarrier.await(); return null; } invocationOnMock.callRealMethod(); return null; }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any()); - final AtomicInteger sinkSendCounter = new AtomicInteger(0); Mockito.doAnswer((invocationOnMock) -> { if (startTest) { sinkSendCounter.incrementAndGet(); @@ -232,74 +299,31 @@ public void test() throws Exception { return null; }).when(spiedRegionReplicationSink).send(); - regions[0].setRegionReplicationSink(spiedRegionReplicationSink); - - String oldThreadName = Thread.currentThread().getName(); - Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); - try { - initialFlush = true; - spiedRegionReplicationSink.getFlushRequester().requestFlush(-1); - regions[0].flushMemStoreCyclicBarrierBeforeStartTest.await(); - HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); - assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); - long initialRequestNanos = - spiedRegionReplicationSink.getFlushRequester().getLastRequestNanos(); - assertTrue(initialRequestNanos > 0); - initialFlush = false; - startTest = true; - /** - * Write First cell,replicating to secondary replica is error. - * {@link RegionReplicationSink#checkFailedReplicaAndSend} for this cell would wait for - * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending. - */ - regions[0].put(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); - - /** - * For {@link FlushAction#START_FLUSH}, after {@link RegionReplicationSink#add} is - * ending,{@link RegionReplicationSink#checkFailedReplicaAndSend} could execute, - * {@link FlushAction#START_FLUSH} is in {@link RegionReplicationSink#entries},and is sent by - * {@link RegionReplicationSink#checkFailedReplicaAndSend},After - * {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing could - * commit flush.
- * For {@link FlushAction#COMMIT_FLUSH},it is directly sent. - */ - regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY); - - HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); - /** - * No failed replicas. - */ - assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty()); - /** - * No flush requester. - */ - assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); - assertTrue(spiedRegionReplicationSink.getFlushRequester() - .getLastRequestNanos() == initialRequestNanos); - Queue sinkEntrys = spiedRegionReplicationSink.getEntries(); - assertTrue(sinkEntrys.size() == 0); - assertTrue(!spiedRegionReplicationSink.isSending()); - /** - * send key1,{@link FlushAction#START_FLUSH} and {@link FlushAction#COMMIT_FLUSH} - */ - assertTrue(sinkSendCounter.get() == 3); - - regions[0].put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); + primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink); + return spiedRegionReplicationSink; + } - HTU.waitFor(1200000, () -> !spiedRegionReplicationSink.isSending()); - assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); - assertTrue(spiedRegionReplicationSink.getFlushRequester() - .getLastRequestNanos() == initialRequestNanos); - assertTrue(sinkSendCounter.get() == 4); - assertTrue(sinkEntrys.size() == 0); - assertTrue(catchedThrowableRef.get() == null); - } finally { - startTest = false; - Thread.currentThread().setName(oldThreadName); + private HRegionForTest[] createTable() throws Exception { + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setRegionReplication(NB_SERVERS).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .build(); + HTU.getAdmin().createTable(tableDescriptor); + final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; + for (int i = 0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getRegions(tableName); + for (HRegion region : onlineRegions) { + int replicaId = region.getRegionInfo().getReplicaId(); + assertTrue(regions[replicaId] == null); + regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; + } } + for (Region region : regions) { + assertNotNull(region); + } + return regions; } - public static final class HRegionForTest extends HRegion { static final String USER_THREAD_NAME = "TestReplicationHang"; final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); From 703116accfeaac4685c6ffa1de13b99ec383b269 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Fri, 25 Feb 2022 16:48:08 +0800 Subject: [PATCH 03/13] add more comments --- .../TestRegionReplicationSinkSuspend.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java index 0007ce160819..47cb1c3f3ae7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -67,6 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; @@ -118,7 +120,39 @@ public static void tearDown() throws Exception { } /** - * This test is for HBASE-26768 + *
+   * This test is for HBASE-26768. Assuming we have only one secondary replica for region.
+   * The threads sequence before HBASE-26768 is:
+   * 1.We write first cell,replicating to secondary replica, but the regionServer which
+   *   serving secondary replica responses error.
+   * 2.The regionServer which serving primary replica receives error response and Netty
+   *   nioEventLoop thread invokes {@link RegionReplicationSink#onComplete},and adds the
+   *   error secondary replica to the failed local variable,but stops before entering the
+   *   synchronized block.
+   * 3.Flusher thread invokes {@link RegionReplicationSink#add} to add
+   *   {@link FlushAction#START_FLUSH} flush marker, in {@link RegionReplicationSink#add}
+   *   clears the {@link RegionReplicationSink#failedReplicas}, set
+   *   {@link RegionReplicationSink#lastFlushedSequenceId} and
+   *   {@link RegionReplicationFlushRequester#lastFlushedSequenceId} to the flushing hfile
+   *   logSequenceId.
+   * 4.After flusher thread completes {@link RegionReplicationSink#add}, Netty nioEventLoop
+   *   thread continues to enter the synchronized block in
+   *   {@link RegionReplicationSink#onComplete},and still adds the error replica to the
+   *   {@link RegionReplicationSink#failedReplicas} even though the maxSequenceId of the failed
+   *   replicating {@link WALEdit}s is less than
+   *   {@link RegionReplicationSink#lastFlushedSequenceId}.
+   * 5.In the synchronized block, {@link RegionReplicationFlushRequester#requestFlush} is also
+   *   invoked but the flushing would be skipped because in
+   *   {@link RegionReplicationFlushRequester#flush},
+   *   {@link RegionReplicationFlushRequester#pendingFlushRequestSequenceId} is less than
+   *   {@link RegionReplicationFlushRequester#lastFlushedSequenceId}, so this only secondary
+   *   replica is marked failed and requested flushing is skipped, the replication may suspend
+   *   until next memstore flush.
+   *   After HBASE-26768, for above step 4, the error replica is not added to the
+   *   {@link RegionReplicationSink#failedReplicas} if the maxSequenceId of the failed replicating
+   *   {@link WALEdit}s is less than {@link RegionReplicationSink#lastFlushedSequenceId}, and
+   *   {@link RegionReplicationFlushRequester#requestFlush} is not invoked.
+   * 
*/ @Test public void test() throws Exception { @@ -423,6 +457,9 @@ public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, return super.replicateToReplica(rpcController, replicateWALEntryRequest); } + /** + * Simulate the first cell replicating error. + */ int count = callCounter.incrementAndGet(); if (count > 1) { return super.replicateToReplica(rpcController, replicateWALEntryRequest); From 7a5eaba88912c4c15bbb922d32930b2b233b2600 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Fri, 25 Feb 2022 18:52:12 +0800 Subject: [PATCH 04/13] add comments --- .../regionreplication/RegionReplicationFlushRequester.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java index c1fad5a9deb3..57b1b994669c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java @@ -57,7 +57,7 @@ class RegionReplicationFlushRequester { private final long minIntervalSecs; - private long lastRequestNanos = 0; + private long lastRequestNanos; private long pendingFlushRequestSequenceId; From 59c7f3e95c3a7e8690b8459407695427cda61cbc Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 28 Feb 2022 12:06:10 +0800 Subject: [PATCH 05/13] simplify the code --- .../RegionReplicationFlushRequester.java | 13 -- .../RegionReplicationSink.java | 86 +++---- .../TestRegionReplicationSinkSuspend.java | 221 +++--------------- 3 files changed, 57 insertions(+), 263 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java index 57b1b994669c..34313241d1f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.regionreplication; -import com.google.errorprone.annotations.RestrictedApi; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Threads; @@ -141,16 +140,4 @@ synchronized void recordFlush(long sequenceId) { pendingFlushRequest = null; } } - - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - synchronized Timeout getPendingFlushRequest() { - return this.pendingFlushRequest; - } - - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - synchronized long getLastRequestNanos() { - return this.lastRequestNanos; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index 22e685afc65c..ce06adf3b80d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -22,7 +22,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -171,7 +170,7 @@ void replicated() { private volatile long pendingSize; - private volatile long lastFlushedSequenceId; + private long lastFlushedSequenceId; private boolean sending; @@ -205,8 +204,8 @@ public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescri this.failedReplicas = new IntHashSet(regionReplication - 1); } - private void onComplete(List sent, - Map> replica2Error) { + void onComplete(List sent, + Map> replica2Error) { long maxSequenceId = Long.MIN_VALUE; long toReleaseSize = 0; for (SinkEntry entry : sent) { @@ -215,42 +214,34 @@ private void onComplete(List sent, toReleaseSize += entry.size; } manager.decrease(toReleaseSize); - Set failed = new HashSet<>(); - long lastFlushedSequenceIdToUse = this.lastFlushedSequenceId; - for (Map.Entry> entry : replica2Error.entrySet()) { - Integer replicaId = entry.getKey(); - Throwable error = entry.getValue().getValue(); - if (error != null) { - if (maxSequenceId > lastFlushedSequenceIdToUse) { - LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" + - " id of sunk entris is {}, which is greater than the last flush SN {}," + - " we will stop replicating for a while and trigger a flush", - replicaId, primary, maxSequenceId, lastFlushedSequenceIdToUse, error); - failed.add(replicaId); - } else { - LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" + - " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + - " we will not stop replicating", - replicaId, primary, maxSequenceId, lastFlushedSequenceIdToUse, error); - } - } - } - checkFailedReplicaAndSend(failed, toReleaseSize, maxSequenceId); - } - - protected void checkFailedReplicaAndSend(Set failed, long toReleaseSize, - long maxSequenceId) { synchronized (entries) { pendingSize -= toReleaseSize; - // double check - if (maxSequenceId > lastFlushedSequenceId) { - if (!failed.isEmpty()) { - failedReplicas.addAll(failed); - flushRequester.requestFlush(maxSequenceId); + boolean updateFailedReplicas = false; + for (Map.Entry> entry : replica2Error.entrySet()) { + Integer replicaId = entry.getKey(); + Throwable error = entry.getValue().getValue(); + if (error != null) { + if (maxSequenceId > lastFlushedSequenceId) { + LOG.warn( + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is greater than the last flush SN {}," + + " we will stop replicating for a while and trigger a flush", + replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); + failedReplicas.add(replicaId); + updateFailedReplicas = true; + } else { + LOG.warn( + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + + " we will not stop replicating", + replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); + } } } + + if (updateFailedReplicas) { + flushRequester.requestFlush(maxSequenceId); + } sending = false; if (stopping) { stopped = true; @@ -459,12 +450,6 @@ public void waitUntilStopped() throws InterruptedException { } } - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - RegionReplicationFlushRequester getFlushRequester() { - return this.flushRequester; - } - @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*/src/test/.*") IntHashSet getFailedReplicas() { @@ -472,21 +457,4 @@ IntHashSet getFailedReplicas() { return this.failedReplicas; } } - - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - boolean isSending() { - synchronized (entries) { - return this.sending; - } - } - - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - Queue getEntries() { - synchronized (entries) { - return this.entries; - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java index 47cb1c3f3ae7..dd2404ae5c0b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.regionreplication; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -25,11 +24,9 @@ import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.Queue; -import java.util.Set; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -43,13 +40,11 @@ import org.apache.hadoop.hbase.StartTestingClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -59,7 +54,6 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink.SinkEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; @@ -68,7 +62,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; @@ -96,7 +89,6 @@ public class TestRegionReplicationSinkSuspend { private static TableName tableName = TableName.valueOf("testRegionReplicationSinkSuspend"); private static volatile boolean startTest = false; - private static volatile boolean initialFlush = false; @BeforeClass public static void setUp() throws Exception { @@ -120,121 +112,38 @@ public static void tearDown() throws Exception { } /** - *
-   * This test is for HBASE-26768. Assuming we have only one secondary replica for region.
-   * The threads sequence before HBASE-26768 is:
-   * 1.We write first cell,replicating to secondary replica, but the regionServer which
-   *   serving secondary replica responses error.
-   * 2.The regionServer which serving primary replica receives error response and Netty
-   *   nioEventLoop thread invokes {@link RegionReplicationSink#onComplete},and adds the
-   *   error secondary replica to the failed local variable,but stops before entering the
-   *   synchronized block.
-   * 3.Flusher thread invokes {@link RegionReplicationSink#add} to add
-   *   {@link FlushAction#START_FLUSH} flush marker, in {@link RegionReplicationSink#add}
-   *   clears the {@link RegionReplicationSink#failedReplicas}, set
-   *   {@link RegionReplicationSink#lastFlushedSequenceId} and
-   *   {@link RegionReplicationFlushRequester#lastFlushedSequenceId} to the flushing hfile
-   *   logSequenceId.
-   * 4.After flusher thread completes {@link RegionReplicationSink#add}, Netty nioEventLoop
-   *   thread continues to enter the synchronized block in
-   *   {@link RegionReplicationSink#onComplete},and still adds the error replica to the
-   *   {@link RegionReplicationSink#failedReplicas} even though the maxSequenceId of the failed
-   *   replicating {@link WALEdit}s is less than
-   *   {@link RegionReplicationSink#lastFlushedSequenceId}.
-   * 5.In the synchronized block, {@link RegionReplicationFlushRequester#requestFlush} is also
-   *   invoked but the flushing would be skipped because in
-   *   {@link RegionReplicationFlushRequester#flush},
-   *   {@link RegionReplicationFlushRequester#pendingFlushRequestSequenceId} is less than
-   *   {@link RegionReplicationFlushRequester#lastFlushedSequenceId}, so this only secondary
-   *   replica is marked failed and requested flushing is skipped, the replication may suspend
-   *   until next memstore flush.
-   *   After HBASE-26768, for above step 4, the error replica is not added to the
-   *   {@link RegionReplicationSink#failedReplicas} if the maxSequenceId of the failed replicating
-   *   {@link WALEdit}s is less than {@link RegionReplicationSink#lastFlushedSequenceId}, and
-   *   {@link RegionReplicationFlushRequester#requestFlush} is not invoked.
-   * 
+ * This test is for HBASE-26768,test the case that we have already clear the + * {@link RegionReplicationSink#failedReplicas} due to a flush all edit,which may in flusher + * thread,and then in the callback of replay, which may in Netty nioEventLoop,we add a replica to + * the {@link RegionReplicationSink#failedReplicas} because of a failure of replicating. */ @Test public void test() throws Exception { final HRegionForTest[] regions = this.createTable(); - final AtomicInteger sinkCheckFailedReplicaAndSendCounter = new AtomicInteger(0); - final AtomicInteger sinkSendCounter = new AtomicInteger(0); - AtomicReference catchedThrowableRef = new AtomicReference(null); - + final AtomicInteger onCompleteCounter = new AtomicInteger(0); + final AtomicBoolean completedRef = new AtomicBoolean(false); RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); assertTrue(regionReplicationSink != null); - RegionReplicationSink spiedRegionReplicationSink = - this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], - sinkCheckFailedReplicaAndSendCounter, sinkSendCounter, catchedThrowableRef); + + RegionReplicationSink spiedRegionReplicationSink = this.setUpSpiedRegionReplicationSink( + regionReplicationSink, regions[0], onCompleteCounter, + completedRef); String oldThreadName = Thread.currentThread().getName(); Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); try { - initialFlush = true; - spiedRegionReplicationSink.getFlushRequester().requestFlush(-1); - regions[0].flushMemStoreCyclicBarrierBeforeStartTest.await(); - HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); - assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); - long initialRequestNanos = - spiedRegionReplicationSink.getFlushRequester().getLastRequestNanos(); - assertTrue(initialRequestNanos > 0); - initialFlush = false; startTest = true; /** * Write First cell,replicating to secondary replica is error. - * {@link RegionReplicationSink#checkFailedReplicaAndSend} for this cell would wait for - * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending. */ byte[] rowKey1 = Bytes.toBytes(1); + regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); - /** - * For {@link FlushAction#START_FLUSH}, after {@link RegionReplicationSink#add} is - * ending,{@link RegionReplicationSink#checkFailedReplicaAndSend} could execute, - * {@link FlushAction#START_FLUSH} is in {@link RegionReplicationSink#entries},and is sent by - * {@link RegionReplicationSink#checkFailedReplicaAndSend},After - * {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing could - * commit flush.
- * For {@link FlushAction#COMMIT_FLUSH},it is directly sent. - */ regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY); - HTU.waitFor(120000, () -> !spiedRegionReplicationSink.isSending()); - /** - * No failed replicas. Before HBASE-26768, here failed replicas is not empty and replication - * suspend. - */ + HTU.waitFor(120000, () -> completedRef.get()); assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty()); - /** - * No flush requester. - */ - assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); - assertTrue(spiedRegionReplicationSink.getFlushRequester() - .getLastRequestNanos() == initialRequestNanos); - Queue sinkEntrys = spiedRegionReplicationSink.getEntries(); - assertTrue(sinkEntrys.size() == 0); - assertTrue(!spiedRegionReplicationSink.isSending()); - /** - * send key1,{@link FlushAction#START_FLUSH} and {@link FlushAction#COMMIT_FLUSH} - */ - assertTrue(sinkSendCounter.get() == 3); - - byte[] rowKey2 = Bytes.toBytes(2); - regions[0].put(new Put(rowKey2).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); - - HTU.waitFor(1200000, () -> !spiedRegionReplicationSink.isSending()); - assertTrue(spiedRegionReplicationSink.getFlushRequester().getPendingFlushRequest() == null); - assertTrue(spiedRegionReplicationSink.getFlushRequester() - .getLastRequestNanos() == initialRequestNanos); - /** - * send key1,{@link FlushAction#START_FLUSH} , {@link FlushAction#COMMIT_FLUSH},and key2 - */ - assertTrue(sinkSendCounter.get() == 4); - assertTrue(sinkEntrys.size() == 0); - assertTrue(catchedThrowableRef.get() == null); - - assertEquals(1, Bytes.toInt(regions[1].get(new Get(rowKey1)).getValue(FAMILY, QUAL))); - assertEquals(2, Bytes.toInt(regions[1].get(new Get(rowKey2)).getValue(FAMILY, QUAL))); } finally { startTest = false; Thread.currentThread().setName(oldThreadName); @@ -243,56 +152,26 @@ public void test() throws Exception { private RegionReplicationSink setUpSpiedRegionReplicationSink( final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion, - final AtomicInteger sinkCheckFailedReplicaAndSendCounter, - final AtomicInteger sinkSendCounter, final AtomicReference catchedThrowableRef) { + final AtomicInteger onCompleteCounter, + final AtomicBoolean completedRef) { + RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); Mockito.doAnswer((invocationOnMock) -> { - try { - if (!startTest) { - invocationOnMock.callRealMethod(); - return null; - } - int count = sinkCheckFailedReplicaAndSendCounter.incrementAndGet(); - if (count == 1) { - RegionReplicationSink currentSink = (RegionReplicationSink) invocationOnMock.getMock(); - assertTrue(currentSink.getFailedReplicas().size() == 0); - @SuppressWarnings("unchecked") - Set inputFailedReplicas = (Set) invocationOnMock.getArgument(0); - assertTrue(inputFailedReplicas.size() == 1); - /** - * Wait {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} starting, - * Only before entering {@link RegionReplicationSink#checkFailedReplicaAndSend}, - * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} could starting. - */ - primaryRegion.cyclicBarrier.await(); - /** - * Wait {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending. - * After {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending, - * {@link RegionReplicationSink#checkFailedReplicaAndSend} could enter its method to - * execute. - */ - primaryRegion.cyclicBarrier.await(); - assertTrue(currentSink.getFailedReplicas().isEmpty()); - - invocationOnMock.callRealMethod(); - assertTrue(currentSink.getFailedReplicas().isEmpty()); - assertTrue(currentSink.getFlushRequester().getPendingFlushRequest() == null); - /** - * After {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing - * could commit flush - */ - primaryRegion.cyclicBarrier.await(); - return null; - } + if (!startTest) { invocationOnMock.callRealMethod(); return null; - } catch (Throwable throwable) { - catchedThrowableRef.set(throwable); } + int count = onCompleteCounter.incrementAndGet(); + if (count == 1) { + primaryRegion.cyclicBarrier.await(); + invocationOnMock.callRealMethod(); + completedRef.set(true); + return null; + } + invocationOnMock.callRealMethod(); return null; - }).when(spiedRegionReplicationSink).checkFailedReplicaAndSend(Mockito.anySet(), - Mockito.anyLong(), Mockito.anyLong()); + }).when(spiedRegionReplicationSink).onComplete(Mockito.anyList(), Mockito.anyMap()); Mockito.doAnswer((invocationOnMock) -> { if (!startTest) { @@ -301,23 +180,7 @@ private RegionReplicationSink setUpSpiedRegionReplicationSink( } if (primaryRegion.prepareFlush && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME)) { - /** - * Only before entering {@link RegionReplicationSink#checkFailedReplicaAndSend}, - * {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} could starting. - */ - primaryRegion.cyclicBarrier.await(); invocationOnMock.callRealMethod(); - /** - * After {@link RegionReplicationSink#add} for {@link FlushAction#START_FLUSH} ending, - * {@link RegionReplicationSink#checkFailedReplicaAndSend} could enter its method to - * execute. - */ - primaryRegion.cyclicBarrier.await(); - /** - * Wait {@link RegionReplicationSink#checkFailedReplicaAndSend} ending. After - * {@link RegionReplicationSink#checkFailedReplicaAndSend} ending, MemStore Flushing could - * commit flush. - */ primaryRegion.cyclicBarrier.await(); return null; } @@ -325,14 +188,6 @@ private RegionReplicationSink setUpSpiedRegionReplicationSink( return null; }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any()); - Mockito.doAnswer((invocationOnMock) -> { - if (startTest) { - sinkSendCounter.incrementAndGet(); - } - invocationOnMock.callRealMethod(); - return null; - }).when(spiedRegionReplicationSink).send(); - primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink); return spiedRegionReplicationSink; } @@ -362,7 +217,6 @@ public static final class HRegionForTest extends HRegion { static final String USER_THREAD_NAME = "TestReplicationHang"; final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); volatile boolean prepareFlush = false; - final CyclicBarrier flushMemStoreCyclicBarrierBeforeStartTest = new CyclicBarrier(2); public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, TableDescriptor htd, RegionServerServices rsServices) { @@ -379,24 +233,6 @@ public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink this.regionReplicationSink = Optional.of(regionReplicationSink); } - @Override - protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, - Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, - FlushLifeCycleTracker tracker) throws IOException { - - FlushResultImpl result = super.internalFlushcache(wal, myseqid, storesToFlush, status, - writeFlushWalMarker, tracker); - if (!startTest && this.getRegionInfo().getReplicaId() == 0 - && this.getRegionInfo().getTable().equals(tableName) && initialFlush) { - try { - this.flushMemStoreCyclicBarrierBeforeStartTest.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - return result; - } - @Override protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, @@ -411,8 +247,11 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, this.prepareFlush = true; } try { - return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, + PrepareFlushResult result = + super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker, tracker); + + return result; } finally { if (this.getRegionInfo().getReplicaId() == 0 From 9c4dfdc3c9e04398271f4fc4afb061b3604b7226 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 28 Feb 2022 12:09:07 +0800 Subject: [PATCH 06/13] simplify the code --- .../regionserver/regionreplication/RegionReplicationSink.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index ce06adf3b80d..56d7b925fbda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -98,7 +98,7 @@ public class RegionReplicationSink { public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100; - static final class SinkEntry { + private static final class SinkEntry { final WALKeyImpl key; @@ -205,7 +205,7 @@ public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescri } void onComplete(List sent, - Map> replica2Error) { + Map> replica2Error) { long maxSequenceId = Long.MIN_VALUE; long toReleaseSize = 0; for (SinkEntry entry : sent) { From 141fdcf22b9c81847545ee1e826f00e38ce9765a Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 28 Feb 2022 12:44:10 +0800 Subject: [PATCH 07/13] simplify the code --- ...TestRegionReplicationSinkCallbackAndFluhConcurrently.java} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/{TestRegionReplicationSinkSuspend.java => TestRegionReplicationSinkCallbackAndFluhConcurrently.java} (98%) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java similarity index 98% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java index dd2404ae5c0b..3b8ff639a407 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkSuspend.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java @@ -75,10 +75,10 @@ import org.mockito.Mockito; @Category({ RegionServerTests.class, LargeTests.class }) -public class TestRegionReplicationSinkSuspend { +public class TestRegionReplicationSinkCallbackAndFluhConcurrently { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestRegionReplicationSinkSuspend.class); + HBaseClassTestRule.forClass(TestRegionReplicationSinkCallbackAndFluhConcurrently.class); private static final byte[] FAMILY = Bytes.toBytes("family_test"); From 653cfca26b2b2f26e1783c98ae8220e9fa9002f1 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 28 Feb 2022 12:46:24 +0800 Subject: [PATCH 08/13] simplify the code --- .../regionreplication/RegionReplicationSink.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index 56d7b925fbda..17f7be7dcb10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -216,7 +216,7 @@ void onComplete(List sent, manager.decrease(toReleaseSize); synchronized (entries) { pendingSize -= toReleaseSize; - boolean updateFailedReplicas = false; + boolean addFailedReplicas = false; for (Map.Entry> entry : replica2Error.entrySet()) { Integer replicaId = entry.getKey(); Throwable error = entry.getValue().getValue(); @@ -228,7 +228,7 @@ void onComplete(List sent, + " we will stop replicating for a while and trigger a flush", replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); failedReplicas.add(replicaId); - updateFailedReplicas = true; + addFailedReplicas = true; } else { LOG.warn( "Failed to replicate to secondary replica {} for {}, since the max sequence" @@ -239,7 +239,7 @@ void onComplete(List sent, } } - if (updateFailedReplicas) { + if (addFailedReplicas) { flushRequester.requestFlush(maxSequenceId); } sending = false; From c5869380a5b49610d580ba312fd1f041388eb7ad Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 28 Feb 2022 14:31:12 +0800 Subject: [PATCH 09/13] simplify the code --- .../TestRegionReplicationSinkCallbackAndFluhConcurrently.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java index 3b8ff639a407..328486190564 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java @@ -164,6 +164,7 @@ private RegionReplicationSink setUpSpiedRegionReplicationSink( } int count = onCompleteCounter.incrementAndGet(); if (count == 1) { + //wait for adding for flush all edit completed. primaryRegion.cyclicBarrier.await(); invocationOnMock.callRealMethod(); completedRef.set(true); @@ -181,6 +182,7 @@ private RegionReplicationSink setUpSpiedRegionReplicationSink( if (primaryRegion.prepareFlush && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME)) { invocationOnMock.callRealMethod(); + //onComplete could execute primaryRegion.cyclicBarrier.await(); return null; } From 53d52c35a2e48ea9d1c5e88dc5deb46df69765ef Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 28 Feb 2022 15:15:21 +0800 Subject: [PATCH 10/13] simplify the code --- ...estRegionReplicationSinkCallbackAndFlushConcurrently.java} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/{TestRegionReplicationSinkCallbackAndFluhConcurrently.java => TestRegionReplicationSinkCallbackAndFlushConcurrently.java} (99%) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java similarity index 99% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java index 328486190564..52c568c0c632 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFluhConcurrently.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java @@ -75,10 +75,10 @@ import org.mockito.Mockito; @Category({ RegionServerTests.class, LargeTests.class }) -public class TestRegionReplicationSinkCallbackAndFluhConcurrently { +public class TestRegionReplicationSinkCallbackAndFlushConcurrently { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestRegionReplicationSinkCallbackAndFluhConcurrently.class); + HBaseClassTestRule.forClass(TestRegionReplicationSinkCallbackAndFlushConcurrently.class); private static final byte[] FAMILY = Bytes.toBytes("family_test"); From 88f6b7d30d830d224d5d69d1bf024bbf43cbee95 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Tue, 1 Mar 2022 17:26:10 +0800 Subject: [PATCH 11/13] import issue --- .../TestRegionReplicationSinkCallbackAndFlushConcurrently.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java index 52c568c0c632..630eb68d1dab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; + import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -66,7 +67,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; From de953a14eae8ed026ec7ff107a0a23c6043f08de Mon Sep 17 00:00:00 2001 From: comnetwork Date: Tue, 1 Mar 2022 19:07:02 +0800 Subject: [PATCH 12/13] import issue --- ...cationSinkCallbackAndFlushConcurrently.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java index 630eb68d1dab..2c67fe6c9b15 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java @@ -27,7 +27,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,19 +53,11 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; - import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; -import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -74,6 +65,15 @@ import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; + @Category({ RegionServerTests.class, LargeTests.class }) public class TestRegionReplicationSinkCallbackAndFlushConcurrently { @ClassRule From 6e15af1d2587f613d7fb2043310af411dcba8edb Mon Sep 17 00:00:00 2001 From: comnetwork Date: Wed, 2 Mar 2022 10:57:08 +0800 Subject: [PATCH 13/13] modify test --- .../RegionReplicationSink.java | 4 +-- ...ationSinkCallbackAndFlushConcurrently.java | 26 +++++++++---------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index 17f7be7dcb10..cd5d30707d9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -254,7 +254,7 @@ void onComplete(List sent, } } - void send() { + private void send() { List toSend = new ArrayList<>(); long totalSize = 0L; boolean hasMetaEdit = false; @@ -324,7 +324,7 @@ private boolean isStartFlushAllStores(FlushDescriptor flushDesc) { return storesFlushed.containsAll(tableDesc.getColumnFamilyNames()); } - private Optional getStartFlushAllDescriptor(Cell metaCell) { + Optional getStartFlushAllDescriptor(Cell metaCell) { if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) { return Optional.empty(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java index 2c67fe6c9b15..d6432a696e52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSinkCallbackAndFlushConcurrently.java @@ -120,14 +120,13 @@ public static void tearDown() throws Exception { @Test public void test() throws Exception { final HRegionForTest[] regions = this.createTable(); - final AtomicInteger onCompleteCounter = new AtomicInteger(0); final AtomicBoolean completedRef = new AtomicBoolean(false); RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); assertTrue(regionReplicationSink != null); RegionReplicationSink spiedRegionReplicationSink = this.setUpSpiedRegionReplicationSink( - regionReplicationSink, regions[0], onCompleteCounter, + regionReplicationSink, regions[0], completedRef); String oldThreadName = Thread.currentThread().getName(); @@ -152,9 +151,9 @@ public void test() throws Exception { private RegionReplicationSink setUpSpiedRegionReplicationSink( final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion, - final AtomicInteger onCompleteCounter, final AtomicBoolean completedRef) { - + final AtomicInteger onCompleteCounter = new AtomicInteger(0); + final AtomicInteger getStartFlushAllDescriptorCounter = new AtomicInteger(0); RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); Mockito.doAnswer((invocationOnMock) -> { @@ -164,7 +163,6 @@ private RegionReplicationSink setUpSpiedRegionReplicationSink( } int count = onCompleteCounter.incrementAndGet(); if (count == 1) { - //wait for adding for flush all edit completed. primaryRegion.cyclicBarrier.await(); invocationOnMock.callRealMethod(); completedRef.set(true); @@ -176,19 +174,19 @@ private RegionReplicationSink setUpSpiedRegionReplicationSink( Mockito.doAnswer((invocationOnMock) -> { if (!startTest) { - invocationOnMock.callRealMethod(); - return null; + return invocationOnMock.callRealMethod(); } if (primaryRegion.prepareFlush && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME)) { - invocationOnMock.callRealMethod(); - //onComplete could execute - primaryRegion.cyclicBarrier.await(); - return null; + int count = getStartFlushAllDescriptorCounter.incrementAndGet(); + if(count == 1) { + //onComplete could execute + primaryRegion.cyclicBarrier.await(); + return invocationOnMock.callRealMethod(); + } } - invocationOnMock.callRealMethod(); - return null; - }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any()); + return invocationOnMock.callRealMethod(); + }).when(spiedRegionReplicationSink).getStartFlushAllDescriptor(Mockito.any()); primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink); return spiedRegionReplicationSink;