Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24822 Add a command to support archive the earliest log file ma… #2202

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,14 @@ Future<Boolean> abortProcedureAsync(long procId, boolean mayInterruptIfRunning)
*/
void rollWALWriter(ServerName serverName) throws IOException, FailedLogCloseException;

/**
* Archive the earliest log file.
*
* @param serverName The servername of the regionserver.
* @throws IOException if a remote or network exception occurs
*/
void archiveWAL(ServerName serverName) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should method name reflect that the earliest log is archived ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will fix later.
Thanks.


/**
* Helper that delegates to getClusterMetrics().getMasterCoprocessorNames().
* @return an array of master coprocessors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ public void rollWALWriter(ServerName serverName) throws IOException, FailedLogCl
get(admin.rollWALWriter(serverName));
}

@Override
public void archiveWAL(ServerName serverName) throws IOException {
get(admin.archiveWAL(serverName));
}

@Override
public CompactionState getCompactionState(TableName tableName) throws IOException {
return get(admin.getCompactionState(tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,13 @@ default CompletableFuture<Integer> getMasterInfoPort() {
*/
CompletableFuture<Void> rollWALWriter(ServerName serverName);

/**
* Archive the earliest log file.
*
* @param serverName The servername of the region server.
*/
CompletableFuture<Void> archiveWAL(ServerName serverName);

/**
* Clear compacting queues on a region server.
* @param serverName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,11 @@ public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
return wrap(rawAdmin.rollWALWriter(serverName));
}

@Override
public CompletableFuture<Void> archiveWAL(ServerName serverName) {
return wrap(rawAdmin.archiveWAL(serverName));
}

@Override
public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
return wrap(rawAdmin.clearCompactionQueues(serverName, queues));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ArchiveWALRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ArchiveWALResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
Expand Down Expand Up @@ -2913,6 +2915,17 @@ public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
.serverName(serverName).call();
}

@Override
public CompletableFuture<Void> archiveWAL(ServerName serverName) {
return this
.<Void> newAdminCaller()
.action(
(controller, stub) -> this.<ArchiveWALRequest, ArchiveWALResponse, Void> adminCall(
controller, stub, RequestConverter.buildArchiveWALRequest(),
(s, c, req, done) -> s.archiveWAL(controller, req, done), resp -> null))
.serverName(serverName).call();
}

@Override
public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ArchiveWALRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
Expand Down Expand Up @@ -883,6 +884,20 @@ public static RollWALWriterRequest buildRollWALWriterRequest() {
return ROLL_WAL_WRITER_REQUEST;
}

/**
* @see #buildArchiveWALRequest()
*/
private static ArchiveWALRequest ARCHIVE_WAL_REQUEST = ArchiveWALRequest.newBuilder()
.build();

/**
* Get the static ArchiveWALRequest instance
* @return a ArchiveWALRequest
*/
public static ArchiveWALRequest buildArchiveWALRequest() {
return ARCHIVE_WAL_REQUEST;
}

/**
* @see #buildGetServerInfoRequest()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ message RollWALWriterResponse {
repeated bytes region_to_flush = 1;
}

message ArchiveWALRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be renamed to go with the updated method name ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix later. Thanks.

Copy link
Contributor Author

@bsglz bsglz Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought again, it should be a common name if we want to improve it in future, such as archive a specify log file, archive all log files, etc.
WDYT? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it make sense, i will remove the "earliest" from method name also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ArchiveWALResponse should contain the status of archival (including error).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ArchiveWALResponse should contain the status of archival (including error).

Make sense.

Copy link
Contributor Author

@bsglz bsglz Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the existing api in Admin.java, such as flushRegion, compactRegion, seems currently we dont use status, if there is something wrong at server side, we notice client by throw ServiceException.
Though the FlushRegionResponse has a flushed flag, but does not be used at client.
Maybe we should keep consistent?
Thanks for comment. @tedyu

}

message ArchiveWALResponse {
}

message StopServerRequest {
required string reason = 1;
}
Expand Down Expand Up @@ -351,6 +357,9 @@ service AdminService {
rpc RollWALWriter(RollWALWriterRequest)
returns(RollWALWriterResponse);

rpc ArchiveWAL(ArchiveWALRequest)
returns(ArchiveWALResponse);

rpc GetServerInfo(GetServerInfoRequest)
returns(GetServerInfoResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ default void postRollWALWriterRequest(
final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {}

/**
* This will be called before executing user request to archive earliest log
* file of a region server.
* @param ctx the environment to interact with the framework and region server.
*/
default void preArchiveWALRequest(
final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {}

/**
* This will be called after executing user request to archive earliest log
* file of a region server.
* @param ctx the environment to interact with the framework and region server.
*/
default void postArchiveWALRequest(
final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {}

/**
* This will be called after the replication endpoint is instantiated.
* @param ctx the environment to interact with the framework and region server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ArchiveWALRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ArchiveWALResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
Expand Down Expand Up @@ -2394,6 +2396,27 @@ public RollWALWriterResponse rollWALWriter(final RpcController controller,
}
}

/**
* Archive the earliest log file of the region server.
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
public ArchiveWALResponse archiveWAL(final RpcController controller,
final ArchiveWALRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
regionServer.getRegionServerCoprocessorHost().preArchiveWALRequest();
regionServer.getWalRoller().requestArchive();
regionServer.getRegionServerCoprocessorHost().postArchiveWALRequest();
ArchiveWALResponse.Builder builder = ArchiveWALResponse.newBuilder();
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}

/**
* Stop the region server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,24 @@ public void call(RegionServerObserver observer) throws IOException {
});
}

public void preArchiveWALRequest() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preArchiveWALRequest(this);
}
});
}

public void postArchiveWALRequest() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postArchiveWALRequest(this);
}
});
}

public void preReplicateLogEntries()
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -718,7 +721,7 @@ private void cleanOldLogs() throws IOException {
if (logsToArchive != null) {
for (Pair<Path, Long> logAndSize : logsToArchive) {
this.totalLogSize.addAndGet(-logAndSize.getSecond());
archiveLogFile(logAndSize.getFirst());
moveLogFileToArchiveDir(logAndSize.getFirst());
this.walFile2Props.remove(logAndSize.getFirst());
}
}
Expand All @@ -732,7 +735,7 @@ public static Path getWALArchivePath(Path archiveDir, Path p) {
return new Path(archiveDir, p.getName());
}

private void archiveLogFile(final Path p) throws IOException {
private void moveLogFileToArchiveDir(final Path p) throws IOException {
Path newPath = getWALArchivePath(this.walArchiveDir, p);
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
Expand Down Expand Up @@ -872,6 +875,33 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
}
}

public void archive(RegionServerServices services) throws IOException{
if (getNumRolledLogFiles() < 1) {
return;
}
// get the earliest log of this WAL instance
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
// flush regions if necessary
Map<byte[], List<byte[]>> regions =
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
if (regions != null) {
for (Map.Entry<byte[], List<byte[]>> entry : regions.entrySet()) {
String encodedRegionName = Bytes.toString(entry.getKey());
HRegion r = (HRegion) services.getRegion(encodedRegionName);
if (r == null) {
throw new RegionException("Failed to flush of " + encodedRegionName +
" when archive manually, because it is not online on rs");
}
r.flushcache(entry.getValue(), false, FlushLifeCycleTracker.DUMMY);
}
}

// move the log file to archive dir
moveLogFileToArchiveDir(firstWALEntry.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if IOException is thrown from this call ?
Would totalLogSize be correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, the move action should be called first.
BTW, this part copied from existing code, maybe need a follow-on jira to fix it too.
Thanks.

this.totalLogSize.addAndGet(-firstWALEntry.getValue().logSize);
this.walFile2Props.remove(firstWALEntry.getKey());
}

// public only until class moves to o.a.h.h.wal
/** @return the size of log files in use */
public long getLogFileSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
Expand Down Expand Up @@ -100,6 +101,14 @@ public void requestRollAll() {
}
}

public void requestArchive() throws IOException {
synchronized (this) {
for (WAL wal : wals.keySet()) {
wal.archive((RegionServerServices) this.abortable);
}
}
}

protected AbstractWALRoller(String name, Configuration conf, T abortable) {
super(name);
this.abortable = abortable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -143,6 +144,10 @@ public Map<byte[], List<byte[]>> rollWriter(boolean force) {
return rollWriter();
}

@Override
public void archive(RegionServerServices services) {
}

@Override
public void shutdown() {
if(closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
Expand Down Expand Up @@ -83,6 +84,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
*/
Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException;

/**
* Archive the earliest log file.
*/
void archive(RegionServerServices services) throws IOException;

/**
* Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
* Extant edits are left in place in backing storage to be replayed later.
Expand Down
Loading