Skip to content

Commit

Permalink
HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get …
Browse files Browse the repository at this point in the history
…replicated (addressing buklload replication related issue raised in HBASE-22380)

Signed-off-by: Josh Elser <elserj@apache.org>
(cherry picked from commit 4d41402, then resolved conflicts)
  • Loading branch information
wchevreuil committed Oct 29, 2019
1 parent cab9ccc commit d2a027d
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) throws IOException {
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
bulkToken, false, null);
bulkToken, false, null, true);
}

/**
Expand All @@ -138,17 +138,17 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client
final Token<?> userToken, final String bulkToken,
boolean copyFiles) throws IOException {
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
bulkToken, false, null);
bulkToken, false, null, true);
}

public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken,
boolean copyFiles, List<String> clusterIds) throws IOException {
boolean copyFiles, List<String> clusterIds, boolean replicate) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
userToken, bulkToken, copyFiles, clusterIds);
userToken, bulkToken, copyFiles, clusterIds, replicate);

try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2582,16 +2582,19 @@ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableN
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
storeFilesSize, bulkloadSeqId, null);
storeFilesSize, bulkloadSeqId, null, true);
}

public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
Map<String, Long> storeFilesSize, long bulkloadSeqId,
List<String> clusterIds, boolean replicate) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName)
.setBulkloadSeqNum(bulkloadSeqId)
.setReplicate(replicate);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
false, null);
false, null, true);
}

/**
Expand All @@ -583,7 +583,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
List<String> clusterIds) {
List<String> clusterIds, boolean replicate) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);

Expand Down Expand Up @@ -624,6 +624,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
request.setReplicate(replicate);
return request.build();
}

Expand Down
1 change: 1 addition & 0 deletions hbase-protocol-shaded/src/main/protobuf/Client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
optional bool replicate = 8 [default = true];

message FamilyPath {
required bytes family = 1;
Expand Down
1 change: 1 addition & 0 deletions hbase-protocol-shaded/src/main/protobuf/WAL.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
optional bool replicate = 6 [default = true];
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDi
// bulkload the ref file
try {
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
bulkload.disableReplication();
bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
connection.getRegionLocator(table.getName()));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6099,7 +6099,8 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
null, true);
}

/**
Expand Down Expand Up @@ -6150,7 +6151,7 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
boolean copyFile, List<String> clusterIds) throws IOException {
boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
Expand Down Expand Up @@ -6325,7 +6326,7 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
storeFiles, storeFilesSizes, seqId, clusterIds);
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2402,7 +2402,7 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
}
try {
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
request.getCopyFile(), clusterIds);
request.getCopyFile(), clusterIds, request.getReplicate());
} finally {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public Map<byte[], List<Path>> run() {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
clusterIds);
clusterIds, request.getReplicate());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,20 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
if(bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
if(bld.getReplicate()) {
if (bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@ Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {

private List<String> clusterIds = new ArrayList<>();

private boolean replicate = true;

/**
* Represents an HFile waiting to be loaded. An queue is used in this class in order to support
* the case where a region has split during the process of the load. When this happens, the HFile
Expand Down Expand Up @@ -547,7 +549,8 @@ protected byte[] rpcCall() throws Exception {
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds);
assignSeqIds, fsDelegationToken.getUserToken(),
bulkToken, copyFile, clusterIds, replicate);
}
return success ? regionName : null;
} finally {
Expand Down Expand Up @@ -1266,6 +1269,12 @@ public void setClusterIds(List<String> clusterIds) {
this.clusterIds = clusterIds;
}

/**
* Disables replication for these bulkloaded files.
*/
public void disableReplication(){
this.replicate = false;
}
/**
* Infers region boundaries for a new table.
* <p>
Expand Down
Loading

0 comments on commit d2a027d

Please sign in to comment.