Skip to content

Commit

Permalink
HBASE-24694 Support flush a single column family of table (#2179)
Browse files Browse the repository at this point in the history
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
bsglz authored Aug 7, 2020
1 parent 9b49bd6 commit 485e0d2
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,16 @@ Future<Void> modifyColumnFamilyAsync(TableName tableName, ColumnFamilyDescriptor
*/
void flush(TableName tableName) throws IOException;

/**
* Flush the specified column family stores on all regions of the passed table.
* This runs as a synchronous operation.
*
* @param tableName table to flush
* @param columnFamily column family within a table
* @throws IOException if a remote or network exception occurs
*/
void flush(TableName tableName, byte[] columnFamily) throws IOException;

/**
* Flush an individual region. Synchronous operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public void flush(TableName tableName) throws IOException {
get(admin.flush(tableName));
}

@Override
public void flush(TableName tableName, byte[] columnFamily) throws IOException {
get(admin.flush(tableName, columnFamily));
}

@Override
public void flushRegion(byte[] regionName) throws IOException {
get(admin.flushRegion(regionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ CompletableFuture<Void> modifyColumnFamily(TableName tableName,
*/
CompletableFuture<Void> flush(TableName tableName);

/**
* Flush the specified column family stores on all regions of the passed table.
* This runs as a synchronous operation.
* @param tableName table to flush
* @param columnFamily column family within a table
*/
CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);

/**
* Flush an individual region.
* @param regionName region to flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ public CompletableFuture<Void> flush(TableName tableName) {
return wrap(rawAdmin.flush(tableName));
}

@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
return wrap(rawAdmin.flush(tableName, columnFamily));
}

@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
return wrap(rawAdmin.flushRegion(regionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,9 +893,13 @@ public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
}
}

@Override
public CompletableFuture<Void> flush(TableName tableName) {
return flush(tableName, null);
}

@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
Expand All @@ -909,8 +913,12 @@ public CompletableFuture<Void> flush(TableName tableName) {
} else if (!tableEnabled) {
future.completeExceptionally(new TableNotEnabledException(tableName));
} else {
Map<String, String> props = new HashMap<>();
if (columnFamily != null) {
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
}
addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
new HashMap<>()), (ret, err3) -> {
props), (ret, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,9 @@ public enum OperationStatusCode {
*/
public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v");

/** The family str as a key in map*/
public static final String FAMILY_KEY_STR = "family";

/**
* The current version of the meta table.
* - pre-hbase 0.92. There is no META_VERSION column in the root table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.procedure.flush;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;

Expand All @@ -28,7 +29,9 @@
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;

/**
* This flush region implementation uses the distributed procedure framework to flush
Expand All @@ -40,23 +43,27 @@ public class FlushTableSubprocedure extends Subprocedure {
private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);

private final String table;
private final String family;
private final List<HRegion> regions;
private final FlushTableSubprocedurePool taskManager;

public FlushTableSubprocedure(ProcedureMember member,
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
List<HRegion> regions, String table,
List<HRegion> regions, String table, String family,
FlushTableSubprocedurePool taskManager) {
super(member, table, errorListener, wakeFrequency, timeout);
this.table = table;
this.family = family;
this.regions = regions;
this.taskManager = taskManager;
}

private static class RegionFlushTask implements Callable<Void> {
HRegion region;
RegionFlushTask(HRegion region) {
List<byte[]> families;
RegionFlushTask(HRegion region, List<byte[]> families) {
this.region = region;
this.families = families;
}

@Override
Expand All @@ -65,7 +72,11 @@ public Void call() throws Exception {
region.startRegionOperation();
try {
LOG.debug("Flush region " + region.toString() + " started...");
region.flush(true);
if (families == null) {
region.flush(true);
} else {
region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
}
// TODO: flush result is not checked?
} finally {
LOG.debug("Closing region operation on " + region);
Expand All @@ -88,11 +99,15 @@ private void flushRegions() throws ForeignException {
throw new IllegalStateException("Attempting to flush "
+ table + " but we currently have outstanding tasks");
}

List<byte[]> families = null;
if (family != null) {
LOG.debug("About to flush family {} on all regions for table {}", family, table);
families = Arrays.asList(Bytes.toBytes(family));
}
// Add all hfiles already existing in region.
for (HRegion region : regions) {
// submit one task per region for parallelize by region.
taskManager.submitTask(new RegionFlushTask(region));
taskManager.submitTask(new RegionFlushTask(region, families));
monitor.rethrowException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -51,6 +52,7 @@

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;

@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
Expand Down Expand Up @@ -149,11 +151,19 @@ public void execProcedure(ProcedureDescription desc) throws IOException {

ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());

HBaseProtos.NameStringPair family = null;
for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) {
if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) {
family = nsp;
}
}
byte[] procArgs = family != null ? family.toByteArray() : new byte[0];

// Kick of the global procedure from the master coordinator to the region servers.
// We rely on the existing Distributed Procedure framework to prevent any concurrent
// procedure with the same name.
Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
new byte[0], Lists.newArrayList(regionServers));
procArgs, Lists.newArrayList(regionServers));
monitor.rethrowException();
if (proc == null) {
String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;

/**
* This manager class handles flushing of the regions for table on a {@link HRegionServer}.
Expand Down Expand Up @@ -129,9 +130,10 @@ public void stop(boolean force) throws IOException {
* there is a possibility of a race where regions may be missed.
*
* @param table
* @param family
* @return Subprocedure to submit to the ProcedureMemeber.
*/
public Subprocedure buildSubprocedure(String table) {
public Subprocedure buildSubprocedure(String table, String family) {

// don't run the subprocedure if the parent is stop(ping)
if (rss.isStopping() || rss.isStopped()) {
Expand Down Expand Up @@ -162,7 +164,7 @@ public Subprocedure buildSubprocedure(String table) {
FlushTableSubprocedurePool taskManager =
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, table, taskManager);
timeoutMillis, involvedRegions, table, family, taskManager);
}

/**
Expand All @@ -183,8 +185,19 @@ public class FlushTableSubprocedureBuilder implements SubprocedureFactory {

@Override
public Subprocedure buildSubprocedure(String name, byte[] data) {
String family = null;
// Currently we do not put other data except family, so it is ok to
// judge by length that if family was specified
if (data.length > 0) {
try {
HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data);
family = nsp.getValue();
} catch (Exception e) {
LOG.error("fail to get family by parsing from data", e);
}
}
// The name of the procedure instance from the master is the table name.
return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,32 @@ public void testFlushTable() throws Exception {
}
}

@Test
public void testFlushTableFamily() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
admin.flush(tableName, FAMILY_1);
assertFalse(getRegionInfo().stream().
anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
}
}

@Test
public void testAsyncFlushTable() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
admin.flush(tableName).get();
assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
}

@Test
public void testAsyncFlushTableFamily() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
admin.flush(tableName, FAMILY_1).get();
assertFalse(getRegionInfo().stream().
anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
}

@Test
public void testFlushRegion() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ public void flush(TableName tableName) throws IOException {
admin.flush(tableName);
}

public void flush(TableName tableName, byte[] columnFamily) throws IOException {
admin.flush(tableName, columnFamily);
}

public void flushRegion(byte[] regionName) throws IOException {
admin.flushRegion(regionName);
}
Expand Down
6 changes: 5 additions & 1 deletion hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ def flush(name, family = nil)
rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
# Unknown region. Try table.
begin
@admin.flush(TableName.valueOf(name))
if family_bytes.nil?
@admin.flush(TableName.valueOf(name))
else
@admin.flush(TableName.valueOf(name), family_bytes)
end
rescue java.lang.IllegalArgumentException
# Unknown table. Try region server.
@admin.flushRegionServer(ServerName.valueOf(name))
Expand Down
4 changes: 3 additions & 1 deletion hbase-shell/src/main/ruby/shell/commands/flush.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ def help
Flush all regions in passed table or pass a region row to
flush an individual region or a region server name whose format
is 'host,port,startcode', to flush all its regions.
You can also flush a single column family within a region.
You can also flush a single column family for all regions within a table,
or for an specific region only.
For example:
hbase> flush 'TABLENAME'
hbase> flush 'TABLENAME','FAMILYNAME'
hbase> flush 'REGIONNAME'
hbase> flush 'REGIONNAME','FAMILYNAME'
hbase> flush 'ENCODED_REGIONNAME'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,11 @@ public void flush(TableName tableName) {

}

@Override
public void flush(TableName tableName, byte[] columnFamily) {
throw new NotImplementedException("flush not supported in ThriftAdmin");
}

@Override
public void flushRegion(byte[] regionName) {
throw new NotImplementedException("flushRegion not supported in ThriftAdmin");
Expand Down

0 comments on commit 485e0d2

Please sign in to comment.