Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27028 Add a shell command for flushing master local region #4457

Merged
merged 4 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2539,4 +2539,9 @@ default BalanceResponse balanceRSGroup(String groupName) throws IOException {
*/
List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, ServerType serverType,
int limit, Map<String, Object> filterParams) throws IOException;

/**
* Flush master local region
*/
void flushMasterStore() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1078,4 +1078,9 @@ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams) throws IOException {
return get(admin.getLogEntries(serverNames, logType, serverType, limit, filterParams));
}

@Override
public void flushMasterStore() throws IOException {
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
get(admin.flushMasterStore());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1785,4 +1785,9 @@ default CompletableFuture<BalanceResponse> balanceRSGroup(String groupName) {
*/
CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams);

/**
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();
}
Original file line number Diff line number Diff line change
Expand Up @@ -949,4 +949,9 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
return wrap(rawAdmin.getLogEntries(serverNames, logType, serverType, limit, filterParams));
}

@Override
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -4280,4 +4282,14 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
return CompletableFuture.completedFuture(Collections.emptyList());
}
}

@Override
public CompletableFuture<Void> flushMasterStore() {
FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
return this.<Void> newMasterCaller()
.action(((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)))
.call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,9 @@ message ModifyColumnStoreFileTrackerResponse {
optional uint64 proc_id = 1;
}

message FlushMasterStoreRequest {}
message FlushMasterStoreResponse {}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1197,6 +1200,9 @@ service MasterService {

rpc ModifyColumnStoreFileTracker(ModifyColumnStoreFileTrackerRequest)
returns(ModifyColumnStoreFileTrackerResponse);

rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,22 @@ default void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment>
final TableName tableName) throws IOException {
}

/**
* Called before the master local region memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
*/
default void preMasterStoreFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called after the master local region memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
*/
default void postMasterStoreFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called before the quota for the user is stored.
* @param ctx the environment to interact with the framework and master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4207,6 +4207,26 @@ public List<HRegionLocation> getMetaLocations() {
return metaRegionLocationCache.getMetaRegionLocations();
}

@Override
public void flushMasterStore() throws IOException {
LOG.info("Force flush master local region.");
if (this.cpHost != null) {
try {
cpHost.preMasterStoreFlush();
} catch (IOException ioe) {
LOG.error("Error invoking master coprocessor preMasterStoreFlush()", ioe);
}
}
masterRegion.flush(true);
if (this.cpHost != null) {
try {
cpHost.postMasterStoreFlush();
} catch (IOException ioe) {
LOG.error("Error invoking master coprocessor postMasterStoreFlush()", ioe);
}
}
}

public Collection<ServerName> getLiveRegionServers() {
return regionServerTracker.getRegionServers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,24 @@ public void call(MasterObserver observer) throws IOException {
});
}

public void preMasterStoreFlush() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preMasterStoreFlush(this);
}
});
}

public void postMasterStoreFlush() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postMasterStoreFlush(this);
}
});
}

public void preSetUserQuota(final String user, final GlobalQuotaSettings quotas)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -3468,4 +3470,16 @@ public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
ReplicateWALEntryRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
}

@Override
public FlushMasterStoreResponse flushMasterStore(RpcController controller,
FlushMasterStoreRequest request) throws ServiceException {
rpcPreCheck("flushMasterStore");
try {
server.flushMasterStore();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return FlushMasterStoreResponse.newBuilder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,9 @@ boolean normalizeRegions(final NormalizeTableFilterParams ntfp, final boolean is
* We need to get this in MTP to tell the syncer the new meta replica count.
*/
MetaLocationSyncer getMetaLocationSyncer();

/**
* Flush master local region
*/
void flushMasterStore() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ public RegionScanner getRegionScanner(Scan scan) throws IOException {
return region.getScanner(scan);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public FlushResult flush(boolean force) throws IOException {
return region.flush(force);
flusherAndCompactor.resetChangesAfterLastFlush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add a line, but where do we reset this in tha past? We do not need to remove the original one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, the flush method is only called by test cases, reset is not called in this method, and there is no reset logic in the relevant test cases: TestMasterRegionWALCleaner/TestMasterRegionOnTwoFileSystems and so on.

Can you help me make sure we don't need reset logic either?

Copy link
Contributor Author

@2005hithlj 2005hithlj Jun 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add a line, but where do we reset this in tha past? We do not need to remove the original one?

Do you mean the following changes are OK?

public FlushResult flush(boolean force) throws IOException {
flusherAndCompactor.resetChangesAfterLastFlush();
return region.flush(force);
}

@Apache9 sir.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we have reset logic in MasterRegionFlusherAndCompactor, before calling this flush method. Now we move the reset logic into this method, then we should remove the reset logic in MasterRegionFlusherAndCompactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha! I have understood what you mean.
I have a new commit again, could you take a look when you have time?
@Apache9 sir.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to record the lastFlushTime here?

FlushResult flushResult = region.flush(force);
flusherAndCompactor.resetLastFlushTime();
return flushResult;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private boolean needCompaction() {
}

private void flushLoop() {
lastFlushTime = EnvironmentEdgeManager.currentTime();
resetLastFlushTime();
while (!closed) {
flushLock.lock();
try {
Expand All @@ -202,10 +202,10 @@ private void flushLoop() {
flushLock.unlock();
}
assert flushRequest;
changesAfterLastFlush.set(0);
resetChangesAfterLastFlush();
try {
region.flush(true);
lastFlushTime = EnvironmentEdgeManager.currentTime();
resetLastFlushTime();
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
abortable.abort("Failed to flush master local region", e);
Expand Down Expand Up @@ -263,6 +263,14 @@ void requestFlush() {
}
}

void resetChangesAfterLastFlush() {
changesAfterLastFlush.set(0);
}

void resetLastFlushTime() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should name it recordLastFlushTime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Apache9 for the review.
I have modified it in the latest commit.

lastFlushTime = EnvironmentEdgeManager.currentTime();
}

@Override
public void close() {
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public static class CPMasterObserver implements MasterCoprocessor, MasterObserve
private boolean postRequestLockCalled;
private boolean preLockHeartbeatCalled;
private boolean postLockHeartbeatCalled;
private boolean preMasterStoreFlushCalled;
private boolean postMasterStoreFlushCalled;

public void resetStates() {
preCreateTableRegionInfosCalled = false;
Expand Down Expand Up @@ -280,6 +282,8 @@ public void resetStates() {
postRequestLockCalled = false;
preLockHeartbeatCalled = false;
postLockHeartbeatCalled = false;
preMasterStoreFlushCalled = false;
postMasterStoreFlushCalled = false;
}

@Override
Expand Down Expand Up @@ -1045,6 +1049,18 @@ public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}

@Override
public void preMasterStoreFlush(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
preMasterStoreFlushCalled = true;
}

@Override
public void postMasterStoreFlush(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
postMasterStoreFlushCalled = true;
}

@Override
public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final String userName, final GlobalQuotaSettings quotas) throws IOException {
Expand Down Expand Up @@ -1680,4 +1696,23 @@ public void testQueueLockAndLockHeartbeatOperations() throws Exception {
ProcedureTestingUtility.waitNoProcedureRunning(master.getMasterProcedureExecutor());
ProcedureTestingUtility.assertProcNotFailed(master.getMasterProcedureExecutor(), procId);
}

@Test
public void testMasterStoreOperations() throws Exception {
SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
HMaster master = cluster.getMaster();
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
cp.resetStates();
assertFalse("No master store flush call", cp.preMasterStoreFlushCalled);
assertFalse("No master store flush call", cp.postMasterStoreFlushCalled);

try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
Admin admin = connection.getAdmin()) {
admin.flushMasterStore();

assertTrue("Master store flush called", cp.preMasterStoreFlushCalled);
assertTrue("Master store flush called", cp.postMasterStoreFlushCalled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ public MetaLocationSyncer getMetaLocationSyncer() {
return null;
}

@Override
public void flushMasterStore() {
}

@Override
public long modifyTableStoreFileTracker(TableName tableName, String dstSFT, long nonceGroup,
long nonce) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,5 @@ public void testPeriodicalFlush() throws InterruptedException {
assertEquals(1, flushCalled.get());
Thread.sleep(1000);
assertEquals(2, flushCalled.get());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -934,4 +934,9 @@ public Future<Void> modifyTableStoreFileTrackerAsync(TableName tableName, String
throws IOException {
return admin.modifyTableStoreFileTrackerAsync(tableName, dstSFT);
}

@Override
public void flushMasterStore() throws IOException {
admin.flushMasterStore();
}
}
8 changes: 7 additions & 1 deletion hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1868,11 +1868,17 @@ def modify_table_sft(tableName, sft)
@admin.modifyTableStoreFileTracker(tableName, sft)
end

#----------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------
# Change table column family's sft
def modify_table_family_sft(tableName, family_bytes, sft)
@admin.modifyColumnFamilyStoreFileTracker(tableName, family_bytes, sft)
end

#----------------------------------------------------------------------------------------------
# Flush master local region
def flush_master_store()
@admin.flushMasterStore()
end
end
# rubocop:enable Metrics/ClassLength
end
1 change: 1 addition & 0 deletions hbase-shell/src/main/ruby/shell.rb
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ def self.exception_handler(hide_traceback)
compact
compaction_switch
flush
flush_master_store
get_balancer_decisions
get_balancer_rejections
get_slowlog_responses
Expand Down
Loading