diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 063b3d458f93..42757b42d106 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -30,12 +30,12 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -66,7 +66,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** @@ -265,6 +264,11 @@ public void enqueueLog(Path wal) { } } + @InterfaceAudience.Private + public Map> getQueues() { + return logQueue.getQueues(); + } + @Override public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index f52a83a6ff75..57c0a1637026 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -41,7 +41,6 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -123,44 +122,64 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, @Override public void run() { int sleepMultiplier = 1; - while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, conf, currentPosition, - source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), - source.getSourceMetrics(), walGroupId)) { - while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!source.isPeerEnabled()) { - Threads.sleep(sleepForRetries); - continue; - } - if (!checkQuota()) { - continue; + WALEntryBatch batch = null; + WALEntryStream entryStream = null; + try { + // we only loop back here if something fatal happened to our stream + while (isReaderRunning()) { + try { + entryStream = + new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), + source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId); + while (isReaderRunning()) { // loop here to keep reusing stream while we can + if (!source.isPeerEnabled()) { + Threads.sleep(sleepForRetries); + continue; + } + if (!checkQuota()) { + continue; + } + + batch = createBatch(entryStream); + batch = readWALEntries(entryStream, batch); + currentPosition = entryStream.getPosition(); + if (batch == null) { + // either the queue have no WAL to read + // or got no new entries (didn't advance position in WAL) + handleEmptyWALEntryBatch(); + entryStream.reset(); // reuse stream + } else { + addBatchToShippingQueue(batch); + } } - WALEntryBatch batch = readWALEntries(entryStream); - currentPosition = entryStream.getPosition(); - if (batch != null) { - // need to propagate the batch even it has no entries since it may carry the last - // sequence id information for serial replication. - LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); - entryBatchQueue.put(batch); + } catch (IOException e) { // stream related + if (handleEofException(e, batch)) { sleepMultiplier = 1; - } else { // got no entries and didn't advance position in WAL - handleEmptyWALEntryBatch(entryStream.getCurrentPath()); - entryStream.reset(); // reuse stream + } else { + LOG.warn("Failed to read stream of replication entries", e); + if (sleepMultiplier < maxRetriesMultiplier) { + sleepMultiplier++; + } + Threads.sleep(sleepForRetries * sleepMultiplier); } + } catch (InterruptedException e) { + LOG.trace("Interrupted while sleeping between WAL reads"); + Thread.currentThread().interrupt(); + } finally { + entryStream.close(); } - } catch (IOException e) { // stream related - if (!handleEofException(e)) { - LOG.warn("Failed to read stream of replication entries", e); - if (sleepMultiplier < maxRetriesMultiplier) { - sleepMultiplier ++; - } - Threads.sleep(sleepForRetries * sleepMultiplier); - } - } catch (InterruptedException e) { - LOG.trace("Interrupted while sleeping between WAL reads"); - Thread.currentThread().interrupt(); } + } catch (IOException e) { + if (sleepMultiplier < maxRetriesMultiplier) { + LOG.debug("Failed to read stream of replication entries: " + e); + sleepMultiplier++; + } else { + LOG.error("Failed to read stream of replication entries", e); + } + Threads.sleep(sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.trace("Interrupted while sleeping between WAL reads"); + Thread.currentThread().interrupt(); } } @@ -189,14 +208,19 @@ protected static final boolean switched(WALEntryStream entryStream, Path path) { return newPath == null || !path.getName().equals(newPath.getName()); } - protected WALEntryBatch readWALEntries(WALEntryStream entryStream) - throws IOException, InterruptedException { + // We need to get the WALEntryBatch from the caller so we can add entries in there + // This is required in case there is any exception in while reading entries + // we do want to loss the existing entries in the batch + protected WALEntryBatch readWALEntries(WALEntryStream entryStream, + WALEntryBatch batch) throws IOException, InterruptedException { Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { // check whether we have switched a file if (currentPath != null && switched(entryStream, currentPath)) { return WALEntryBatch.endOfFile(currentPath); } else { + // This would mean either no more files in the queue + // or there is no new data yet on the current wal return null; } } @@ -208,7 +232,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream) // when reading from the entry stream first time we will enter here currentPath = entryStream.getCurrentPath(); } - WALEntryBatch batch = createBatch(entryStream); + batch.setLastWalPath(currentPath); for (;;) { Entry entry = entryStream.next(); batch.setLastWalPosition(entryStream.getPosition()); @@ -231,10 +255,12 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream) return batch; } - private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + private void handleEmptyWALEntryBatch() throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); - if (source.isRecovered()) { - // we're done with queue recovery, shut ourself down + if (logQueue.getQueue(walGroupId).isEmpty()) { + // we're done with current queue, either this is a recovered queue, or it is the special group + // for a sync replication peer and the peer has been transited to DA or S state. + LOG.debug("Stopping the replication source wal reader"); setReaderRunning(false); // shuts down shipper thread immediately entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); @@ -244,22 +270,38 @@ private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedExcept } /** - * if we get an EOF due to a zero-length log, and there are other logs in queue - * (highly likely we've closed the current log), and autorecovery is - * enabled, then dump the log + * This is to handle the EOFException from the WAL entry stream. EOFException should + * be handled carefully because there are chances of data loss because of never replicating + * the data. Thus we should always try to ship existing batch of entries here. + * If there was only one log in the queue before EOF, we ship the empty batch here + * and since reader is still active, in the next iteration of reader we will + * stop the reader. + * If there was more than one log in the queue before EOF, we ship the existing batch + * and reset the wal patch and position to the log with EOF, so shipper can remove + * logs from replication queue * @return true only the IOE can be handled */ - private boolean handleEofException(IOException e) { + private boolean handleEofException(IOException e, WALEntryBatch batch) + throws InterruptedException { PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source // since we don't add current log to recovered source queue so it is safe to remove. - if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { + if ((e instanceof EOFException || e.getCause() instanceof EOFException) + && (source.isRecovered() || queue.size() > 1) + && this.eofAutoRecovery) { + Path head = queue.peek(); try { - if (fs.getFileStatus(queue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek()); + if (fs.getFileStatus(head).getLen() == 0) { + // head of the queue is an empty log file + LOG.warn("Forcing removal of 0 length log in queue: {}", head); logQueue.remove(walGroupId); currentPosition = 0; + // After we removed the WAL from the queue, we should + // try shipping the existing batch of entries and set the wal position + // and path to the wal just dequeued to correctly remove logs from the zk + batch.setLastWalPath(head); + batch.setLastWalPosition(currentPosition); + addBatchToShippingQueue(batch); return true; } } catch (IOException ioe) { @@ -269,6 +311,20 @@ private boolean handleEofException(IOException e) { return false; } + /** + * Update the batch try to ship and return true if shipped + * @param batch Batch of entries to ship + * @throws InterruptedException throws interrupted exception + * @throws IOException throws io exception from stream + */ + private void addBatchToShippingQueue(WALEntryBatch batch) + throws InterruptedException, IOException { + // need to propagate the batch even it has no entries since it may carry the last + // sequence id information for serial replication. + LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); + entryBatchQueue.put(batch); + } + public Path getCurrentPath() { // if we've read some WAL entries, get the Path we read from WALEntryBatch batchQueueHead = entryBatchQueue.peek(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index d0e76fbd77e4..254dc4afe223 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -50,7 +50,7 @@ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, } @Override - protected WALEntryBatch readWALEntries(WALEntryStream entryStream) + protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) throws IOException, InterruptedException { Path currentPath = entryStream.getCurrentPath(); if (!entryStream.hasNext()) { @@ -70,7 +70,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream) currentPath = entryStream.getCurrentPath(); } long positionBefore = entryStream.getPosition(); - WALEntryBatch batch = createBatch(entryStream); + batch = createBatch(entryStream); for (;;) { Entry entry = entryStream.peek(); boolean doFiltering = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 4f96c96d3c5d..8301dff26d61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -94,6 +94,10 @@ public Path getLastWalPath() { return lastWalPath; } + public void setLastWalPath(Path lastWalPath) { + this.lastWalPath = lastWalPath; + } + /** * @return the position in the last WAL that was read. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 5b8f05721432..721a1226d198 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -80,7 +80,7 @@ class WALEntryStream implements Closeable { * @param walFileLengthProvider provides the length of the WAL file * @param serverName the server name which all WALs belong to * @param metrics the replication metrics - * @throws IOException + * @throws IOException throw IO exception from stream */ public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, @@ -368,7 +368,9 @@ private void openReader(Path path) throws IOException { handleFileNotFound(path, fnfe); } catch (RemoteException re) { IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); - if (!(ioe instanceof FileNotFoundException)) throw ioe; + if (!(ioe instanceof FileNotFoundException)) { + throw ioe; + } handleFileNotFound(path, (FileNotFoundException)ioe); } catch (LeaseNotRecoveredException lnre) { // HBASE-15019 the WAL was not closed due to some hiccup. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 96e86ffaf184..5fb51e525788 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -20,10 +20,10 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -43,8 +43,10 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.junit.After; import org.junit.AfterClass; @@ -52,7 +54,7 @@ import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; /** @@ -63,7 +65,8 @@ */ public class TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); - + private static Connection connection1; + private static Connection connection2; protected static Configuration CONF_WITH_LOCALFS; protected static ReplicationAdmin admin; @@ -84,6 +87,8 @@ public class TestReplicationBase { NB_ROWS_IN_BATCH * 10; protected static final long SLEEP_TIME = 500; protected static final int NB_RETRIES = 50; + protected static AtomicInteger replicateCount = new AtomicInteger(); + protected static volatile List replicatedEntries = Lists.newArrayList(); protected static final TableName tableName = TableName.valueOf("test"); protected static final byte[] famName = Bytes.toBytes("f"); @@ -238,26 +243,26 @@ private static void startClusters() throws Exception { // as a component in deciding maximum number of parallel batches to send to the peer cluster. UTIL2.startMiniCluster(NUM_SLAVES2); - admin = new ReplicationAdmin(CONF1); - hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin(); + connection1 = ConnectionFactory.createConnection(CONF1); + connection2 = ConnectionFactory.createConnection(CONF2); + hbaseAdmin = connection1.getAdmin(); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); - Connection connection1 = ConnectionFactory.createConnection(CONF1); - Connection connection2 = ConnectionFactory.createConnection(CONF2); - try (Admin admin1 = connection1.getAdmin()) { + try ( + Admin admin1 = connection1.getAdmin(); + Admin admin2 = connection2.getAdmin()) { admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - } - try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + UTIL1.waitUntilAllRegionsAssigned(tableName); + htable1 = connection1.getTable(tableName); + UTIL2.waitUntilAllRegionsAssigned(tableName); + htable2 = connection2.getTable(tableName); } - UTIL1.waitUntilAllRegionsAssigned(tableName); - UTIL2.waitUntilAllRegionsAssigned(tableName); - htable1 = connection1.getTable(tableName); - htable2 = connection2.getTable(tableName); + } @BeforeClass @@ -273,9 +278,10 @@ private boolean peerExist(String peerId) throws IOException { @Before public void setUpBase() throws Exception { if (!peerExist(PEER_ID2)) { - ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() - .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build(); - hbaseAdmin.addReplicationPeer(PEER_ID2, rpc); + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl( + ReplicationEndpointTest.class.getName()); + hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build()); } } @@ -351,7 +357,33 @@ public static void tearDownAfterClass() throws Exception { if (admin != null) { admin.close(); } + if (hbaseAdmin != null) { + hbaseAdmin.close(); + } + + if (connection2 != null) { + connection2.close(); + } + if (connection1 != null) { + connection1.close(); + } UTIL2.shutdownMiniCluster(); UTIL1.shutdownMiniCluster(); } + + /** + * Custom replication endpoint to keep track of replication status for tests. + */ + public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint { + public ReplicationEndpointTest() { + replicateCount.set(0); + } + + @Override public boolean replicate(ReplicateContext replicateContext) { + replicateCount.incrementAndGet(); + replicatedEntries.addAll(replicateContext.getEntries()); + + return super.replicate(replicateContext); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index c0f22a9ac126..2d72618f091b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -6,9 +6,7 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,56 +18,99 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestReplicationEmptyWALRecovery extends TestReplicationBase { +@Category + ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery + extends TestReplicationBase { + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); + NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); @Before public void setUp() throws IOException, InterruptedException { cleanUp(); + scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL); + replicateCount.set(0); + replicatedEntries.clear(); } /** * Waits until there is only one log(the current writing one) in the replication queue - * @param numRs number of regionservers + * + * @param numRs number of region servers */ - private void waitForLogAdvance(int numRs) throws Exception { - Waiter.waitFor(CONF1, 10000, new Waiter.Predicate() { + private void waitForLogAdvance(int numRs) { + Waiter.waitFor(CONF1, 100000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { for (int i = 0; i < numRs; i++) { HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); RegionInfo regionInfo = - UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); Path currentFile = ((AbstractFSWAL) wal).getCurrentFileName(); - Replication replicationService = (Replication) UTIL1.getHBaseCluster() - .getRegionServer(i).getReplicationSourceService(); + Replication replicationService = + (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); + for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() + .getSources()) { + ReplicationSource source = (ReplicationSource) rsi; + // We are making sure that there is only one log queue and that is for the + // current WAL of region server + String logPrefix = source.getQueues().keySet().stream().findFirst().get(); + if (!currentFile.equals(source.getCurrentPath()) + || source.getQueues().keySet().size() != 1 + || source.getQueues().get(logPrefix).size() != 1) { + return false; + } + } + } + return true; + } + }); + } + + private void verifyNumberOfLogsInQueue(int numQueues, int numRs) { + Waiter.waitFor(CONF1, 10000, new Waiter.Predicate() { + @Override + public boolean evaluate() { + for (int i = 0; i < numRs; i++) { + Replication replicationService = + (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() - .getSources()) { + .getSources()) { ReplicationSource source = (ReplicationSource) rsi; - if (!currentFile.equals(source.getCurrentPath())) { + String logPrefix = source.getQueues().keySet().stream().findFirst().get(); + if (source.getQueues().get(logPrefix).size() != numQueues) { return false; } } @@ -82,25 +123,211 @@ public boolean evaluate() throws Exception { @Test public void testEmptyWALRecovery() throws Exception { final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); + // for each RS, create an empty wal with same walGroupId + final List emptyWalPaths = new ArrayList<>(); + long ts = System.currentTimeMillis(); + for (int i = 0; i < numRs; i++) { + RegionInfo regionInfo = + UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); + UTIL1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + injectEmptyWAL(numRs, emptyWalPaths); + + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs); + verifyNumberOfLogsInQueue(1, numRs); + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + runSimplePutDeleteTest(); + rollWalsAndWaitForDeque(numRs); + } + + /** + * Test empty WAL along with non empty WALs in the same batch. This test is to make sure + * when we see the empty and handle the EOF exception, we are able to existing the previous + * batch of entries without loosing it. This test also tests the number of batches shipped + * + * @throws Exception throws any exception + */ + @Test + public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception { + // Disable the replication peer to accumulate the non empty WAL followed by empty WAL + hbaseAdmin.disableReplicationPeer(PEER_ID2); + int numOfEntriesToReplicate = 20; + + final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); // for each RS, create an empty wal with same walGroupId final List emptyWalPaths = new ArrayList<>(); long ts = System.currentTimeMillis(); for (int i = 0; i < numRs; i++) { RegionInfo regionInfo = - UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); + + appendEntriesToWal(numOfEntriesToReplicate, wal); + wal.rollWriter(); + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); + UTIL1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + + injectEmptyWAL(numRs, emptyWalPaths); + // There should be three WALs in queue + // 1. empty WAL + // 2. non empty WAL + // 3. live WAL + //verifyNumberOfLogsInQueue(3, numRs); + hbaseAdmin.enableReplicationPeer(PEER_ID2); + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs); + + // Now we should expect numOfEntriesToReplicate entries + // replicated from each region server. This makes sure we didn't loose data + // from any previous batch when we encounter EOF exception for empty file. + Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, + replicatedEntries.size()); + + // We expect just one batch of replication which will + // be from when we handle the EOF exception. + Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue()); + verifyNumberOfLogsInQueue(1, numRs); + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + runSimplePutDeleteTest(); + rollWalsAndWaitForDeque(numRs); + } + + /** + * Test empty WAL along with non empty WALs in the same batch. This test is to make sure + * when we see the empty WAL and handle the EOF exception, we are able to proceed + * with next batch and replicate it properly without missing data. + * + * @throws Exception throws any exception + */ + @Test + public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception { + // Disable the replication peer to accumulate the non empty WAL followed by empty WAL + hbaseAdmin.disableReplicationPeer(PEER_ID2); + int numOfEntriesToReplicate = 20; + + final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); + // for each RS, create an empty wal with same walGroupId + final List emptyWalPaths = new ArrayList<>(); + + long ts = System.currentTimeMillis(); + WAL wal = null; + for (int i = 0; i < numRs; i++) { + RegionInfo regionInfo = + UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); + wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); + appendEntriesToWal(numOfEntriesToReplicate, wal); String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); UTIL1.getTestFileSystem().create(emptyWalPath).close(); emptyWalPaths.add(emptyWalPath); + } + injectEmptyWAL(numRs, emptyWalPaths); + // roll the WAL now + for (int i = 0; i < numRs; i++) { + wal.rollWriter(); + } + hbaseAdmin.enableReplicationPeer(PEER_ID2); + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs); + + // Now we should expect numOfEntriesToReplicate entries + // replicated from each region server. This makes sure we didn't loose data + // from any previous batch when we encounter EOF exception for empty file. + Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, + replicatedEntries.size()); + + // We expect just one batch of replication to be shipped which will + // for non empty WAL + Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get()); + verifyNumberOfLogsInQueue(1, numRs); + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + runSimplePutDeleteTest(); + rollWalsAndWaitForDeque(numRs); + } + + /** + * This test make sure we replicate all the enties from the non empty WALs which + * are surrounding the empty WALs + * + * @throws Exception throws exception + */ + @Test + public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception { + // Disable the replication peer to accumulate the non empty WAL followed by empty WAL + hbaseAdmin.disableReplicationPeer(PEER_ID2); + int numOfEntriesToReplicate = 20; + + final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); + // for each RS, create an empty wal with same walGroupId + final List emptyWalPaths = new ArrayList<>(); - // inject our empty wal into the replication queue, and then roll the original wal, which - // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to - // determine if the file being replicated currently is still opened for write, so just inject a - // new wal to the replication queue does not mean the previous file is closed. + long ts = System.currentTimeMillis(); + WAL wal = null; + for (int i = 0; i < numRs; i++) { + RegionInfo regionInfo = + UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); + wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); + appendEntriesToWal(numOfEntriesToReplicate, wal); + wal.rollWriter(); + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); + UTIL1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + injectEmptyWAL(numRs, emptyWalPaths); + + // roll the WAL again with some entries + for (int i = 0; i < numRs; i++) { + appendEntriesToWal(numOfEntriesToReplicate, wal); + wal.rollWriter(); + } + + hbaseAdmin.enableReplicationPeer(PEER_ID2); + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs); + + // Now we should expect numOfEntriesToReplicate entries + // replicated from each region server. This makes sure we didn't loose data + // from any previous batch when we encounter EOF exception for empty file. + Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2, + replicatedEntries.size()); + + // We expect two batch of replication to be shipped which will + // for non empty WAL + Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get()); + verifyNumberOfLogsInQueue(1, numRs); + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + runSimplePutDeleteTest(); + rollWalsAndWaitForDeque(numRs); + } + + // inject our empty wal into the replication queue, and then roll the original wal, which + // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to + // determine if the file being replicated currently is still opened for write, so just inject a + // new wal to the replication queue does not mean the previous file is closed. + private void injectEmptyWAL(int numRs, List emptyWalPaths) throws IOException { for (int i = 0; i < numRs; i++) { HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); Replication replicationService = (Replication) hrs.getReplicationSourceService(); @@ -111,13 +338,32 @@ public void testEmptyWALRecovery() throws Exception { WAL wal = hrs.getWAL(regionInfo); wal.rollWriter(true); } + } - // ReplicationSource should advance past the empty wal, or else the test will fail + protected WALKeyImpl getWalKeyImpl() { + return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes); + } + + // Roll the WAL and wait for it to get deque from the log queue + private void rollWalsAndWaitForDeque(int numRs) throws IOException { + RegionInfo regionInfo = + UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); + for (int i = 0; i < numRs; i++) { + WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + wal.rollWriter(); + } waitForLogAdvance(numRs); + } - // we're now writing to the new wal - // if everything works, the source should've stopped reading from the empty wal, and start - // replicating from the new wal - runSimplePutDeleteTest(); + private void appendEntriesToWal(int numEntries, WAL wal) throws IOException { + long txId = -1; + for (int i = 0; i < numEntries; i++) { + byte[] b = Bytes.toBytes(Integer.toString(i)); + KeyValue kv = new KeyValue(b, famName, b); + WALEdit edit = new WALEdit(); + edit.add(kv); + txId = wal.appendData(info, getWalKeyImpl(), edit); + } + wal.sync(txId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 9c6fafcf803a..d31b8645cef5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -27,9 +27,9 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; @@ -83,7 +84,6 @@ import org.mockito.Mockito; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; - @Category({ ReplicationTests.class, LargeTests.class }) public class TestWALEntryStream { @@ -687,6 +687,7 @@ public void testEOFExceptionForRecoveredQueue() throws Exception { // Override the max retries multiplier to fail fast. conf.setInt("replication.source.maxretriesmultiplier", 1); conf.setBoolean("replication.source.eof.autorecovery", true); + conf.setInt("replication.source.nb.batches", 10); // Create a reader thread with source as recovered source. ReplicationSource source = mockReplicationSource(true, conf); when(source.isPeerEnabled()).thenReturn(true); @@ -705,7 +706,64 @@ public void testEOFExceptionForRecoveredQueue() throws Exception { assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); } + @Test + public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception { + Configuration conf = new Configuration(CONF); + MetricsSource metrics = mock(MetricsSource.class); + ReplicationSource source = mockReplicationSource(true, conf); + ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); + // Create a 0 length log. + Path emptyLog = new Path(fs.getHomeDirectory(),"log.2"); + FSDataOutputStream fsdos = fs.create(emptyLog); + fsdos.close(); + assertEquals(0, fs.getFileStatus(emptyLog).getLen()); + localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); + + final Path log1 = new Path(fs.getHomeDirectory(), "log.1"); + WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration()); + appendEntries(writer1, 3); + localLogQueue.enqueueLog(log1, fakeWalGroupId); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + // Make it look like the source is from recovered source. + when(mockSourceManager.getOldSources()) + .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source))); + when(source.isPeerEnabled()).thenReturn(true); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + // Override the max retries multiplier to fail fast. + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.eof.autorecovery", true); + conf.setInt("replication.source.nb.batches", 10); + // Create a reader thread. + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, + getDummyFilter(), source, fakeWalGroupId); + assertEquals("Initial log queue size is not correct", + 2, localLogQueue.getQueueSize(fakeWalGroupId)); + reader.run(); + + // remove empty log from logQueue. + assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); + assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId)); + } + private PriorityBlockingQueue getQueue() { return logQueue.getQueue(fakeWalGroupId); } + + private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { + for (int i = 0; i < numEntries; i++) { + byte[] b = Bytes.toBytes(Integer.toString(i)); + KeyValue kv = new KeyValue(b,b,b); + WALEdit edit = new WALEdit(); + edit.add(kv); + WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, + HConstants.DEFAULT_CLUSTER_ID); + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); + writer.append(new WAL.Entry(key, edit)); + writer.sync(false); + } + writer.close(); + } }