diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 4542451dbb45..b5c92fd910f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -34,8 +34,10 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -121,6 +123,16 @@ public class ReplicationSource implements ReplicationSourceInterface { // ReplicationEndpoint which will handle the actual replication private volatile ReplicationEndpoint replicationEndpoint; + private boolean abortOnError; + //This is needed for the startup loop to identify when there's already + //an initialization happening (but not finished yet), + //so that it doesn't try submit another initialize thread. + //NOTE: this should only be set to false at the end of initialize method, prior to return. + private AtomicBoolean startupOngoing = new AtomicBoolean(false); + //Flag that signalizes uncaught error happening while starting up the source + //and a retry should be attempted + private AtomicBoolean retryStartup = new AtomicBoolean(false); + /** * A filter (or a chain of filters) for WAL entries; filters out edits. */ @@ -131,6 +143,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private long defaultBandwidth; private long currentBandwidth; private WALFileLengthProvider walFileLengthProvider; + @VisibleForTesting protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); @@ -219,6 +232,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; + + this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", + true); + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -244,6 +261,7 @@ public void enqueueLog(Path wal) { new AbstractFSWALProvider.WALStartTimeComparator()); // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise // the shipper may quit immediately + queue.put(wal); queues.put(walPrefix, queue); if (this.isSourceActive() && this.walEntryFilter != null) { // new wal group observed after source startup, start a new worker thread to track it @@ -251,8 +269,9 @@ public void enqueueLog(Path wal) { // still not launched, so it's necessary to check workerThreads before start the worker tryStartNewShipper(walPrefix, queue); } + } else { + queue.put(wal); } - queue.put(wal); if (LOG.isTraceEnabled()) { LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix, this.replicationQueueInfo.getQueueId()); @@ -357,19 +376,30 @@ private void initializeWALEntryFilter(UUID peerClusterId) { } private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { - ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); - ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); - if (extant != null) { - LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId); - } else { - LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId); - ReplicationSourceWALReader walReader = - createNewWALReader(walGroupId, queue, worker.getStartPosition()); - Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + - ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException); - worker.setWALReader(walReader); - worker.startup(this::uncaughtException); - } + workerThreads.compute(walGroupId, (key, value) -> { + if (value != null) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "{} Someone has beat us to start a worker thread for wal group {}", + logPeerId(), key); + } + return value; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key); + } + ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); + ReplicationSourceWALReader walReader = + createNewWALReader(walGroupId, queue, worker.getStartPosition()); + Threads.setDaemonThreadRunning( + walReader, Thread.currentThread().getName() + + ".replicationSource.wal-reader." + walGroupId + "," + queueId, + (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + worker.setWALReader(walReader); + worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + return worker; + } + }); } @Override @@ -440,11 +470,28 @@ WALEntryFilter getWalEntryFilter() { return walEntryFilter; } - protected final void uncaughtException(Thread t, Throwable e) { + protected final void uncaughtException(Thread t, Throwable e, + ReplicationSourceManager manager, String peerId) { RSRpcServices.exitIfOOME(e); LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e); - server.abort("Unexpected exception in " + t.getName(), e); + if(abortOnError){ + server.abort("Unexpected exception in " + t.getName(), e); + } + if(manager != null){ + while (true) { + try { + LOG.info("Refreshing replication sources now due to previous error on thread: {}", + t.getName()); + manager.refreshSources(peerId); + break; + } catch (IOException e1) { + LOG.error("Replication sources refresh failed.", e1); + sleepForRetries("Sleeping before try refreshing sources again", + maxRetriesMultiplier); + } + } + } } @Override @@ -543,12 +590,18 @@ private void initialize() { replicationEndpoint.stop(); if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { sleepMultiplier++; + } else { + retryStartup.set(!this.abortOnError); + this.startupOngoing.set(false); + throw new RuntimeException("Exhausted retries to start replication endpoint."); } } } if (!this.isSourceActive()) { - return; + retryStartup.set(!this.abortOnError); + this.startupOngoing.set(false); + throw new IllegalStateException("Source should be active."); } sleepMultiplier = 1; @@ -569,8 +622,10 @@ private void initialize() { } } - if (!this.isSourceActive()) { - return; + if(!this.isSourceActive()) { + retryStartup.set(!this.abortOnError); + this.startupOngoing.set(false); + throw new IllegalStateException("Source should be active."); } LOG.info("{} queueId={} is replicating from cluster={} to cluster={}", logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); @@ -582,6 +637,7 @@ private void initialize() { PriorityBlockingQueue queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } + this.startupOngoing.set(false); } @Override @@ -591,10 +647,32 @@ public void startup() { } // Mark we are running now this.sourceRunning = true; + startupOngoing.set(true); initThread = new Thread(this::initialize); Threads.setDaemonThreadRunning(initThread, Thread.currentThread().getName() + ".replicationSource," + this.queueId, - this::uncaughtException); + (t,e) -> { + //if first initialization attempt failed, and abortOnError is false, we will + //keep looping in this thread until initialize eventually succeeds, + //while the server main startup one can go on with its work. + sourceRunning = false; + uncaughtException(t, e, null, null); + retryStartup.set(!this.abortOnError); + do { + if(retryStartup.get()) { + this.sourceRunning = true; + startupOngoing.set(true); + retryStartup.set(false); + try { + initialize(); + } catch(Throwable error){ + sourceRunning = false; + uncaughtException(t, error, null, null); + retryStartup.set(!this.abortOnError); + } + } + } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); + }); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 92646d2cac45..b171eb4d78df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -280,7 +280,8 @@ private boolean updateLogPosition(WALEntryBatch batch) { public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, - name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); + name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), + handler::uncaughtException); } Path getCurrentPath() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 15f202f06467..275ade22a718 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.OptionalLong; import java.util.UUID; @@ -118,15 +122,15 @@ public void testDefaultSkipsMetaWAL() throws IOException { ReplicationSource rs = new ReplicationSource(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - Mockito.when(peerConfig.getReplicationEndpointImpl()). + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getConfiguration()).thenReturn(conf); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + when(peerConfig.getReplicationEndpointImpl()). thenReturn(DoNothingReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); @@ -156,15 +160,15 @@ public void testWALEntryFilter() throws IOException { ReplicationSource rs = new ReplicationSource(); UUID uuid = UUID.randomUUID(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); - ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); - Mockito.when(peerConfig.getReplicationEndpointImpl()). + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getConfiguration()).thenReturn(conf); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + when(peerConfig.getReplicationEndpointImpl()). thenReturn(DoNothingReplicationEndpoint.class.getName()); - Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); @@ -250,12 +254,12 @@ public void testTerminateTimeout() throws Exception { replicationEndpoint = new DoNothingReplicationEndpoint(); try { replicationEndpoint.start(); - ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); - Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); - ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -396,6 +400,36 @@ protected void doStop() { } } + /** + * Deadend Endpoint. Does nothing. + */ + public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { + + static int count = 0; + + @Override + public synchronized UUID getPeerUUID() { + if(count==0) { + count++; + throw new RuntimeException(); + } else { + return super.getPeerUUID(); + } + } + + } + + public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { + + static int count = 0; + + @Override + public synchronized UUID getPeerUUID() { + throw new RuntimeException(); + } + + } + /** * Test HBASE-20497 * Moved here from TestReplicationSource because doesn't need cluster. @@ -407,15 +441,15 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); queue.put(new Path("/www/html/test")); - RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); - Server server = Mockito.mock(Server.class); - Mockito.when(server.getServerName()).thenReturn(serverName); - Mockito.when(source.getServer()).thenReturn(server); - Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); - ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); - Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) + RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); + Server server = mock(Server.class); + when(server.getServerName()).thenReturn(serverName); + when(source.getServer()).thenReturn(server); + when(source.getServerWALsBelongTo()).thenReturn(deadServer); + ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); + when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) .thenReturn(1001L); - Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) + when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) .thenReturn(-1L); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt("replication.source.maxretriesmultiplier", -1); @@ -423,5 +457,90 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage); assertEquals(1001L, shipper.getStartPosition()); } + + private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, + String endpointName) throws IOException { + conf.setInt("replication.source.maxretriesmultiplier", 1); + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getConfiguration()).thenReturn(conf); + when(mockPeer.getPeerBandwidth()).thenReturn(0L); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + FaultyReplicationEndpoint.count = 0; + when(peerConfig.getReplicationEndpointImpl()). + thenReturn(endpointName); + when(mockPeer.getPeerConfig()).thenReturn(peerConfig); + ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + String queueId = "qid"; + RegionServerServices rss = + TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); + rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, + p -> OptionalLong.empty(), new MetricsSource(queueId)); + return rss; + } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnError() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setBoolean("replication.source.regionserver.abort", false); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); + try { + rs.startup(); + assertTrue(rs.isSourceActive()); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); + assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); + rs.enqueueLog(new Path("a.1")); + assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + + /** + * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, + * when eplication.source.regionserver.abort is set to false. + */ + @Test + public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FaultyReplicationEndpoint.class.getName()); + try { + rs.startup(); + assertTrue(true); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } + + /** + * Test ReplicationSource retries startup once an uncaught exception happens + * during initialization and replication.source.regionserver.abort is set to true. + */ + @Test + public void testAbortTrueOnError() throws IOException { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource rs = new ReplicationSource(); + RegionServerServices rss = setupForAbortTests(rs, conf, + FlakyReplicationEndpoint.class.getName()); + try { + rs.startup(); + Waiter.waitFor(conf, 1000, () -> rss.isAborted()); + assertFalse(rs.isSourceActive()); + } finally { + rs.terminate("Done"); + rss.stop("Done"); + } + } }