From a886f676203fe2c6c2e1fe45a0c65e7e3fb54f6b Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 15 Sep 2020 10:38:27 +0100 Subject: [PATCH 1/4] HBASE-24877 addendum: additional checks to avoid one extra possible race control in the initialize loop --- .../regionserver/ReplicationSource.java | 24 ++++++++++--------- .../regionserver/TestReplicationSource.java | 5 ++-- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index dc0276dc7075..c5dd2bf5db53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -40,7 +40,6 @@ import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -129,7 +128,9 @@ public class ReplicationSource implements ReplicationSourceInterface { //so that it doesn't try submit another initialize thread. //NOTE: this should only be set to false at the end of initialize method, prior to return. private AtomicBoolean startupOngoing = new AtomicBoolean(false); - + //Flag that signalizes uncaught error happening while starting up the source + // and a retry should be attempted + private AtomicBoolean retryStartup = new AtomicBoolean(false); /** * A filter (or a chain of filters) for WAL entries; filters out edits. @@ -577,6 +578,7 @@ private void initialize() { if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { sleepMultiplier++; } else { + retryStartup.set(!this.abortOnError); this.startupOngoing.set(false); throw new RuntimeException("Exhausted retries to start replication endpoint."); } @@ -584,6 +586,7 @@ private void initialize() { } if (!this.isSourceActive()) { + retryStartup.set(!this.abortOnError); this.startupOngoing.set(false); throw new IllegalStateException("Source should be active."); } @@ -607,6 +610,7 @@ private void initialize() { } if(!this.isSourceActive()) { + retryStartup.set(!this.abortOnError); this.startupOngoing.set(false); throw new IllegalStateException("Source should be active."); } @@ -625,25 +629,23 @@ private void initialize() { @Override public void startup() { - //Flag that signalizes uncaught error happening while starting up the source - // and a retry should be attempted - MutableBoolean retryStartup = new MutableBoolean(true); - this.sourceRunning = true; + retryStartup.set(true); do { - if(retryStartup.booleanValue()) { - retryStartup.setValue(false); - startupOngoing.set(true); + if(retryStartup.get()) { // mark we are running now + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); initThread = new Thread(this::initialize); Threads.setDaemonThreadRunning(initThread, Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t,e) -> { sourceRunning = false; uncaughtException(t, e, null, null); - retryStartup.setValue(!this.abortOnError); + retryStartup.set(!this.abortOnError); }); } - } while (this.startupOngoing.get() && !this.abortOnError); + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 3628daf749b9..d790c5f612b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -458,6 +458,7 @@ public void testAbortFalseOnError() throws IOException { Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + FaultyReplicationEndpoint.count = 0; Mockito.when(peerConfig.getReplicationEndpointImpl()). thenReturn(FaultyReplicationEndpoint.class.getName()); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); @@ -495,6 +496,7 @@ public void testAbortTrueOnError() throws IOException { Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); + FaultyReplicationEndpoint.count = 0; Mockito.when(peerConfig.getReplicationEndpointImpl()). thenReturn(FaultyReplicationEndpoint.class.getName()); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); @@ -507,9 +509,8 @@ public void testAbortTrueOnError() throws IOException { p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); - Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0); + Waiter.waitFor(conf, 1000, () -> rss.isAborted()); assertFalse(rs.isSourceActive()); - assertTrue(rss.isAborted()); } finally { rs.terminate("Done"); rss.stop("Done"); From cc94a30b9f3c401da19cfa50a6afa1863753db9e Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 16 Sep 2020 12:05:09 +0100 Subject: [PATCH 2/4] Addressing Duo's comment on the related branch-2 PR --- .../regionserver/ReplicationSource.java | 39 +++++---- .../regionserver/TestReplicationSource.java | 81 ++++++++++++------- 2 files changed, 77 insertions(+), 43 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c5dd2bf5db53..29e94e9537b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -629,23 +629,34 @@ private void initialize() { @Override public void startup() { - retryStartup.set(true); - do { - if(retryStartup.get()) { - // mark we are running now - this.sourceRunning = true; - startupOngoing.set(true); - retryStartup.set(false); - initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - (t,e) -> { + // mark we are running now + this.sourceRunning = true; + startupOngoing.set(true); + initThread = new Thread(this::initialize); + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + (t,e) -> { + //if first initialization attempt failed, and abortOnError is false, we will + //keep looping in this thread until initialize eventually succeeds, + //while the server main startup one can go on with its work. sourceRunning = false; uncaughtException(t, e, null, null); retryStartup.set(!this.abortOnError); - }); - } - } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + do { + if(retryStartup.get()) { + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); + try { + initialize(); + } catch(Throwable error){ + sourceRunning = false; + uncaughtException(t, error, null, null); + retryStartup.set(!this.abortOnError); + } + } + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + }); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index d790c5f612b4..14558926a464 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -400,7 +400,7 @@ protected void doStop() { /** * Deadend Endpoint. Does nothing. */ - public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { static int count = 0; @@ -416,6 +416,17 @@ public synchronized UUID getPeerUUID() { } + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + + static int count = 0; + + @Override + public synchronized UUID getPeerUUID() { + throw new RuntimeException(); + } + + } + /** * Test HBASE-20497 * Moved here from TestReplicationSource because doesn't need cluster. @@ -444,23 +455,16 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception assertEquals(1001L, shipper.getStartPosition()); } - /** - * Test ReplicationSource retries startup once an uncaught exception happens - * during initialization and eplication.source.regionserver.abort is set to false. - */ - @Test - public void testAbortFalseOnError() throws IOException { - ReplicationSource rs = new ReplicationSource(); - Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, + String endpointName) throws IOException { conf.setInt("replication.source.maxretriesmultiplier", 1); - conf.setBoolean("replication.source.regionserver.abort", false); ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); FaultyReplicationEndpoint.count = 0; Mockito.when(peerConfig.getReplicationEndpointImpl()). - thenReturn(FaultyReplicationEndpoint.class.getName()); + thenReturn(endpointName); Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); @@ -469,6 +473,20 @@ public void testAbortFalseOnError() throws IOException { TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); + return rss; + } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnError() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean("replication.source.regionserver.abort", false); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); try { rs.startup(); assertTrue(rs.isSourceActive()); @@ -483,30 +501,35 @@ public void testAbortFalseOnError() throws IOException { } } + /** + * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, + * when eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FaultyReplicationEndpoint.class.getName()); + try { + rs.startup(); + assertTrue(true); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + /** * Test ReplicationSource retries startup once an uncaught exception happens - * during initialization and replication.source.regionserver.abort is set to false. + * during initialization and replication.source.regionserver.abort is set to true. */ @Test public void testAbortTrueOnError() throws IOException { - ReplicationSource rs = new ReplicationSource(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); - conf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - FaultyReplicationEndpoint.count = 0; - Mockito.when(peerConfig.getReplicationEndpointImpl()). - thenReturn(FaultyReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - String queueId = "qid"; - RegionServerServices rss = - TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, - p -> OptionalLong.empty(), new MetricsSource(queueId)); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); try { rs.startup(); Waiter.waitFor(conf, 1000, () -> rss.isAborted()); From 980152c5df3acc5dbbb003194155a841ff2c1125 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Thu, 24 Sep 2020 10:41:11 +0100 Subject: [PATCH 3/4] addressing lates checkstyle --- .../regionserver/ReplicationSource.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index dc5ef5812f48..31e3485b799c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -626,30 +626,30 @@ public void startup() { this.sourceRunning = true; startupOngoing.set(true); initThread = new Thread(this::initialize); - Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, - (t,e) -> { - //if first initialization attempt failed, and abortOnError is false, we will - //keep looping in this thread until initialize eventually succeeds, - //while the server main startup one can go on with its work. - sourceRunning = false; - uncaughtException(t, e, null, null); - retryStartup.set(!this.abortOnError); - do { - if(retryStartup.get()) { - this.sourceRunning = true; - startupOngoing.set(true); - retryStartup.set(false); - try { - initialize(); - } catch(Throwable error){ - sourceRunning = false; - uncaughtException(t, error, null, null); - retryStartup.set(!this.abortOnError); - } + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + (t,e) -> { + //if first initialization attempt failed, and abortOnError is false, we will + //keep looping in this thread until initialize eventually succeeds, + //while the server main startup one can go on with its work. + sourceRunning = false; + uncaughtException(t, e, null, null); + retryStartup.set(!this.abortOnError); + do { + if(retryStartup.get()) { + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); + try { + initialize(); + } catch(Throwable error){ + sourceRunning = false; + uncaughtException(t, error, null, null); + retryStartup.set(!this.abortOnError); } - } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); - }); + } + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + }); } @Override From 5b01752dbf5e19082a355080afbe5adaf47b26a3 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 28 Sep 2020 11:33:48 +0100 Subject: [PATCH 4/4] More checkstyles --- .../regionserver/ReplicationSource.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 31e3485b799c..ad3cf5974b41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -376,7 +376,7 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue q LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId); return value; } else { - LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId); + LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId); ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, queue, worker.getStartPosition()); @@ -629,27 +629,27 @@ public void startup() { Threads.setDaemonThreadRunning(initThread, Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t,e) -> { - //if first initialization attempt failed, and abortOnError is false, we will - //keep looping in this thread until initialize eventually succeeds, - //while the server main startup one can go on with its work. - sourceRunning = false; - uncaughtException(t, e, null, null); - retryStartup.set(!this.abortOnError); - do { - if(retryStartup.get()) { - this.sourceRunning = true; - startupOngoing.set(true); - retryStartup.set(false); - try { - initialize(); - } catch(Throwable error){ - sourceRunning = false; - uncaughtException(t, error, null, null); - retryStartup.set(!this.abortOnError); - } + //if first initialization attempt failed, and abortOnError is false, we will + //keep looping in this thread until initialize eventually succeeds, + //while the server main startup one can go on with its work. + sourceRunning = false; + uncaughtException(t, e, null, null); + retryStartup.set(!this.abortOnError); + do { + if(retryStartup.get()) { + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); + try { + initialize(); + } catch(Throwable error){ + sourceRunning = false; + uncaughtException(t, error, null, null); + retryStartup.set(!this.abortOnError); } - } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); - }); + } + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + }); } @Override