From 38c8bd37319325f97b1a6fe8a64c0c71683782b9 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Sat, 21 Sep 2019 14:39:06 +0100 Subject: [PATCH] HBASE-22380 break circle replication when doing bulkload (#494) Signed-off-by: stack Signed-off-by: Andrew Purtell Signed-off-by: Norbert Kalmar --- .../hbase/client/SecureBulkLoadClient.java | 18 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 12 +- .../shaded/protobuf/RequestConverter.java | 11 +- .../src/main/protobuf/Client.proto | 1 + .../src/main/protobuf/WAL.proto | 1 + .../hadoop/hbase/regionserver/HRegion.java | 9 +- .../hbase/regionserver/RSRpcServices.java | 10 +- .../regionserver/SecureBulkLoadManager.java | 10 +- .../regionserver/HFileReplicator.java | 5 +- .../regionserver/ReplicationSink.java | 44 ++- .../hbase/tool/LoadIncrementalHFiles.java | 8 +- .../regionserver/TestBulkLoadReplication.java | 304 ++++++++++++++++++ .../replication/TestReplicationBase.java | 2 +- 13 files changed, 402 insertions(+), 33 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 218627175713..7e3166c3f4b5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -115,8 +115,8 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) throws IOException { - return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false); + return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, + bulkToken, false, null); } /** @@ -132,13 +132,23 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client * @return true if all are loaded * @throws IOException */ + public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum, + final Token userToken, final String bulkToken, + boolean copyFiles) throws IOException { + return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, + bulkToken, false, null); + } + public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, final List> familyPaths, final byte[] regionName, boolean assignSeqNum, - final Token userToken, final String bulkToken, boolean copyFiles) throws IOException { + final Token userToken, final String bulkToken, + boolean copyFiles, List clusterIds) throws IOException { BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, - userToken, bulkToken, copyFiles); + userToken, bulkToken, copyFiles, clusterIds); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index b3e6e204cdf8..36870b636b51 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2577,13 +2577,23 @@ public static QuotaProtos.SpaceQuota toProtoSpaceQuota( * name * @return The WAL log marker for bulk loads. */ + public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, + ByteString encodedRegionName, Map> storeFiles, + Map storeFilesSize, long bulkloadSeqId) { + return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, + storeFilesSize, bulkloadSeqId, null); + } + public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId) { + Map storeFilesSize, long bulkloadSeqId, List clusterIds) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)) .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); + if(clusterIds != null) { + desc.addAllClusterIds(clusterIds); + } for (Map.Entry> entry : storeFiles.entrySet()) { WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder() diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 2a3024973c5c..7dc8645d2ac5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -566,7 +566,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) { return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false); + false, null); } /** @@ -581,9 +581,9 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( * @return a bulk load request */ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( - final List> familyPaths, - final byte[] regionName, boolean assignSeqNum, - final Token userToken, final String bulkToken, boolean copyFiles) { + final List> familyPaths, final byte[] regionName, boolean assignSeqNum, + final Token userToken, final String bulkToken, boolean copyFiles, + List clusterIds) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -621,6 +621,9 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( request.setBulkToken(bulkToken); } request.setCopyFile(copyFiles); + if (clusterIds != null) { + request.addAllClusterIds(clusterIds); + } return request.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 14abb085d6e5..07d8d711a0a2 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -378,6 +378,7 @@ message BulkLoadHFileRequest { optional DelegationToken fs_token = 4; optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; + repeated string cluster_ids = 7; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index 35a179ce9024..c103075c4473 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -150,6 +150,7 @@ message BulkLoadDescriptor { required bytes encoded_region_name = 2; repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; + repeated string cluster_ids = 5; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ec05075a7853..f60141895be4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6090,7 +6090,7 @@ private static boolean hasMultipleColumnFamilies(Collection */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null); } /** @@ -6135,11 +6135,13 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile) * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded * @param copyFile always copy hfiles if true + * @param clusterIds ids from clusters that had already handled the given bulkload event. * @return Map from family to List of store file paths if successful, null if failed recoverably * @throws IOException if failed unrecoverably. */ public Map> bulkLoadHFiles(Collection> familyPaths, - boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { + boolean assignSeqId, BulkLoadListener bulkLoadListener, + boolean copyFile, List clusterIds) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap<>(); @@ -6314,8 +6316,7 @@ public Map> bulkLoadHFiles(Collection> f WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, - storeFilesSizes, seqId); + storeFiles, storeFilesSizes, seqId, clusterIds); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9b38ea27a005..00e616915be1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2364,6 +2364,12 @@ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); + List clusterIds = new ArrayList<>(request.getClusterIdsList()); + if(clusterIds.contains(this.regionServer.clusterId)){ + return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); + } else { + clusterIds.add(this.regionServer.clusterId); + } try { checkOpen(); requestCount.increment(); @@ -2396,7 +2402,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, } try { map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile()); + request.getCopyFile(), clusterIds); } finally { if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); @@ -2404,7 +2410,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, } } else { // secure bulk load - map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); + map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(map != null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index e0d8a5ecdfd2..0badf2a9c821 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -213,7 +213,12 @@ private boolean isUserReferenced(UserGroupInformation ugi) { } public Map> secureBulkLoadHFiles(final HRegion region, - final BulkLoadHFileRequest request) throws IOException { + final BulkLoadHFileRequest request) throws IOException { + return secureBulkLoadHFiles(region, request, null); + } + + public Map> secureBulkLoadHFiles(final HRegion region, + final BulkLoadHFileRequest request, List clusterIds) throws IOException { final List> familyPaths = new ArrayList<>(request.getFamilyPathCount()); for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath())); @@ -289,7 +294,8 @@ public Map> run() { //We call bulkLoadHFiles as requesting user //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), + clusterIds); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index ff54f41826d5..9bbc16d8597e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -87,17 +87,19 @@ public class HFileReplicator { private ThreadPoolExecutor exec; private int maxCopyThreads; private int copiesPerThread; + private List sourceClusterIds; public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map>>> tableQueueMap, Configuration conf, - Connection connection) throws IOException { + Connection connection, List sourceClusterIds) throws IOException { this.sourceClusterConf = sourceClusterConf; this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; this.bulkLoadHFileMap = tableQueueMap; this.conf = conf; this.connection = connection; + this.sourceClusterIds = sourceClusterIds; userProvider = UserProvider.instantiate(conf); fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); @@ -128,6 +130,7 @@ public Void replicate() throws IOException { LoadIncrementalHFiles loadHFiles = null; try { loadHFiles = new LoadIncrementalHFiles(conf); + loadHFiles.setClusterIds(sourceClusterIds); } catch (Exception e) { LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded" + " data.", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 19f0707289fa..6e6876039afc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -174,9 +174,7 @@ public void replicateEntries(List entries, final CellScanner cells, // invocation of this method per table and cluster id. Map, List>> rowMap = new TreeMap<>(); - // Map of table name Vs list of pair of family and list of hfile paths from its namespace - Map>>> bulkLoadHFileMap = null; - + Map, Map>>>> bulkLoadsPerClusters = null; for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); @@ -204,10 +202,19 @@ public void replicateEntries(List entries, final CellScanner cells, Cell cell = cells.current(); // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + if(bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); + } + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map>>> bulkLoadHFileMap = + bulkLoadsPerClusters.get(bld.getClusterIdsList()); if (bulkLoadHFileMap == null) { bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell); + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { @@ -243,14 +250,26 @@ public void replicateEntries(List entries, final CellScanner cells, LOG.debug("Finished replicating mutations."); } - if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { - LOG.debug("Started replicating bulk loaded data."); - HFileReplicator hFileReplicator = - new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), + if(bulkLoadsPerClusters != null) { + for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { + Map>>> bulkLoadHFileMap = entry.getValue(); + if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) { + if(LOG.isDebugEnabled()) { + LOG.debug("Started replicating bulk loaded data from cluster ids: {}.", + entry.getKey().toString()); + } + HFileReplicator hFileReplicator = + new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId), sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, - getConnection()); - hFileReplicator.replicate(); - LOG.debug("Finished replicating bulk loaded data."); + getConnection(), entry.getKey()); + hFileReplicator.replicate(); + if(LOG.isDebugEnabled()) { + LOG.debug("Finished replicating bulk loaded data from cluster id: {}", + entry.getKey().toString()); + } + } + } } int size = entries.size(); @@ -265,8 +284,7 @@ public void replicateEntries(List entries, final CellScanner cells, private void buildBulkLoadHFileMap( final Map>>> bulkLoadHFileMap, TableName table, - Cell cell) throws IOException { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + BulkLoadDescriptor bld) throws IOException { List storesList = bld.getStoresList(); int storesSize = storesList.size(); for (int j = 0; j < storesSize; j++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 69a3074b1c84..c5ba5a7f2879 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -155,6 +155,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private String bulkToken; + private List clusterIds = new ArrayList<>(); + /** * Represents an HFile waiting to be loaded. An queue is used in this class in order to support * the case where a region has split during the process of the load. When this happens, the HFile @@ -545,7 +547,7 @@ protected byte[] rpcCall() throws Exception { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); + assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds); } return success ? regionName : null; } finally { @@ -1260,6 +1262,10 @@ public void setBulkToken(String stagingDir) { this.bulkToken = stagingDir; } + public void setClusterIds(List clusterIds) { + this.clusterIds = clusterIds; + } + /** * Infers region boundaries for a new table. *

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java new file mode 100644 index 000000000000..028277f56a40 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for bulk load replication. Defines two clusters, with two way replication. + * Performs a bulk load on cluster defined by UTIL1 first, asserts the Cell on the bulk loaded file + * gets into the related table in UTIL1, then also validates the same got replicated to cluster + * UTIL2. Then, bulk loads another file into UTIL2, and checks if related values are present on + * UTIL2, and also gets replicated to UTIL1. + * It also defines a preBulkLoad coprocessor that is added to all test table regions on each of the + * clusters, in order to count amount of times bulk load actually gets invoked. This is to certify + * we are not entered in the infinite loop condition addressed by HBASE-22380. + */ +@Category({ ReplicationTests.class, MediumTests.class}) +public class TestBulkLoadReplication extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadReplication.class); + + protected static final Logger LOG = + LoggerFactory.getLogger(TestBulkLoadReplication.class); + + private static final String PEER1_CLUSTER_ID = "peer1"; + private static final String PEER2_CLUSTER_ID = "peer2"; + private static final String PEER3_CLUSTER_ID = "peer3"; + + private static final String PEER_ID1 = "1"; + private static final String PEER_ID3 = "3"; + + private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0); + private static CountDownLatch BULK_LOAD_LATCH; + + private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility(); + private static final Configuration CONF3 = UTIL3.getConfiguration(); + private static Table htable3; + + @Rule + public TestName name = new TestName(); + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); + setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID); + setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID); + setupConfig(UTIL3, "/3"); + TestReplicationBase.setUpBeforeClass(); + startThirdCluster(); + } + + private static void startThirdCluster() throws Exception { + LOG.info("Setup Zk to same one from UTIL1 and UTIL2"); + UTIL3.setZkCluster(UTIL1.getZkCluster()); + UTIL3.startMiniCluster(NUM_SLAVES1); + + TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); + + Connection connection3 = ConnectionFactory.createConnection(CONF3); + try (Admin admin3 = connection3.getAdmin()) { + admin3.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + UTIL3.waitUntilAllRegionsAssigned(tableName); + htable3 = connection3.getTable(tableName); + } + + @Before + @Override + public void setUpBase() throws Exception { + super.setUpBase(); + ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); + ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); + ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); + //adds cluster1 as a remote peer on cluster2 + UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); + //adds cluster3 as a remote peer on cluster2 + UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); + //adds cluster2 as a remote peer on cluster3 + UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); + setupCoprocessor(UTIL1); + setupCoprocessor(UTIL2); + setupCoprocessor(UTIL3); + } + + private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) { + return ReplicationPeerConfig.newBuilder() + .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build(); + } + + private void setupCoprocessor(HBaseTestingUtility cluster){ + cluster.getHBaseCluster().getRegions(tableName).forEach(r -> { + try { + r.getCoprocessorHost() + .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0, + cluster.getConfiguration()); + } catch (Exception e){ + LOG.error(e.getMessage(), e); + } + }); + } + + @After + @Override + public void tearDownBase() throws Exception { + super.tearDownBase(); + UTIL2.getAdmin().removeReplicationPeer(PEER_ID1); + UTIL2.getAdmin().removeReplicationPeer(PEER_ID3); + UTIL3.getAdmin().removeReplicationPeer(PEER_ID2); + } + + private static void setupBulkLoadConfigsForCluster(Configuration config, + String clusterReplicationId) throws Exception { + config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + config.set(REPLICATION_CLUSTER_ID, clusterReplicationId); + File sourceConfigFolder = testFolder.newFolder(clusterReplicationId); + File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + + "/hbase-site.xml"); + config.writeXml(new FileOutputStream(sourceConfigFile)); + config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath()); + } + + @Test + public void testBulkLoadReplicationActiveActive() throws Exception { + Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); + Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); + Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); + byte[] row = Bytes.toBytes("001"); + byte[] value = Bytes.toBytes("v1"); + assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable); + row = Bytes.toBytes("002"); + value = Bytes.toBytes("v2"); + assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable); + row = Bytes.toBytes("003"); + value = Bytes.toBytes("v3"); + assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable); + //Additional wait to make sure no extra bulk load happens + Thread.sleep(400); + //We have 3 bulk load events (1 initiated on each cluster). + //Each event gets 3 counts (the originator cluster, plus the two peers), + //so BULK_LOADS_COUNT expected value is 3 * 3 = 9. + assertEquals(9, BULK_LOADS_COUNT.get()); + } + + private void assertBulkLoadConditions(byte[] row, byte[] value, + HBaseTestingUtility utility, Table...tables) throws Exception { + BULK_LOAD_LATCH = new CountDownLatch(3); + bulkLoadOnCluster(row, value, utility); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); + assertTableHasValue(tables[0], row, value); + assertTableHasValue(tables[1], row, value); + assertTableHasValue(tables[2], row, value); + } + + private void bulkLoadOnCluster(byte[] row, byte[] value, + HBaseTestingUtility cluster) throws Exception { + String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration()); + copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster()); + BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); + bulkLoadHFilesTool.bulkLoad(tableName, new Path("/bulk_dir")); + } + + private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception { + Path bulkLoadDir = new Path("/bulk_dir/f"); + cluster.getFileSystem().mkdirs(bulkLoadDir); + cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); + } + + private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception { + Get get = new Get(row); + Result result = table.get(get); + assertTrue(result.advance()); + assertEquals(Bytes.toString(value), Bytes.toString(result.value())); + } + + private String createHFileForFamilies(byte[] row, byte[] value, + Configuration clusterConfig) throws IOException { + CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); + cellBuilder.setRow(row) + .setFamily(TestReplicationBase.famName) + .setQualifier(Bytes.toBytes("1")) + .setValue(value) + .setType(Cell.Type.Put); + + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); + // TODO We need a way to do this without creating files + File hFileLocation = testFolder.newFile(); + FSDataOutputStream out = + new FSDataOutputStream(new FileOutputStream(hFileLocation), null); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContext()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(new KeyValue(cellBuilder.build())); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hFileLocation.getAbsoluteFile().getAbsolutePath(); + } + + public static class BulkReplicationTestObserver implements RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(new RegionObserver() { + @Override + public void preBulkLoadHFile(ObserverContext ctx, + List> familyPaths) throws IOException { + BULK_LOADS_COUNT.incrementAndGet(); + } + + @Override + public void postBulkLoadHFile(ObserverContext ctx, + List> stagingFamilyPaths, Map> finalPaths) + throws IOException { + BULK_LOAD_LATCH.countDown(); + } + }); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index e87b076d7e7a..df4448673562 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -164,7 +164,7 @@ protected static void loadData(String prefix, byte[] row, byte[] familyName) thr htable1.put(puts); } - private static void setupConfig(HBaseTestingUtility util, String znodeParent) { + protected static void setupConfig(HBaseTestingUtility util, String znodeParent) { Configuration conf = util.getConfiguration(); conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger