Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.exception;

import org.apache.fluss.annotation.PublicEvolving;

/**
* Thrown when the tabletServer is not available.
*
* @since 0.8
*/
@PublicEvolving
public class TabletServerNotAvailableException extends ApiException {
public TabletServerNotAvailableException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
import org.apache.fluss.rpc.protocol.ApiKeys;
Expand Down Expand Up @@ -78,4 +80,9 @@ CompletableFuture<CommitLakeTableSnapshotResponse> commitLakeTableSnapshot(
@RPC(api = ApiKeys.LAKE_TIERING_HEARTBEAT)
CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
LakeTieringHeartbeatRequest request);

/** Try to controlled shutdown for tabletServer with specify tabletServerId. */
@RPC(api = ApiKeys.CONTROLLED_SHUTDOWN)
CompletableFuture<ControlledShutdownResponse> controlledShutdown(
ControlledShutdownRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public enum ApiKeys {
CREATE_ACLS(1039, 0, 0, PUBLIC),
LIST_ACLS(1040, 0, 0, PUBLIC),
DROP_ACLS(1041, 0, 0, PUBLIC),
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE);

private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
Expand Down
8 changes: 8 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,14 @@ message LakeTieringHeartbeatResponse {
repeated PbHeartbeatRespForTable failed_table_resp = 5;
}

message ControlledShutdownRequest {
required int32 tablet_server_id = 1;
required int32 tablet_server_epoch = 2;
}

message ControlledShutdownResponse {
repeated PbTableBucket remaining_leader_buckets = 1;
}

// --------------- Inner classes ----------------
message PbApiVersion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ protected static void startServer(ServerBase server) {
public void start() throws Exception {
try {
addShutDownHook();

// at first, we need to initialize the file system
pluginManager = PluginUtils.createPluginManagerFromRootFolder(conf);
FileSystem.initialize(conf, pluginManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class CoordinatorContext {
private final Map<TableBucketReplica, Integer> failDeleteNumbers = new HashMap<>();

private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>();
private final Set<Integer> shuttingDownTabletServers = new HashSet<>();

// a map from the table bucket to the state of the bucket.
private final Map<TableBucket, BucketState> bucketStates = new HashMap<>();
Expand Down Expand Up @@ -114,6 +115,24 @@ public Map<Integer, ServerInfo> getLiveTabletServers() {
return liveTabletServers;
}

public Set<Integer> liveTabletServerSet() {
Set<Integer> liveTabletServers = new HashSet<>();
for (Integer brokerId : this.liveTabletServers.keySet()) {
if (!shuttingDownTabletServers.contains(brokerId)) {
liveTabletServers.add(brokerId);
}
}
return liveTabletServers;
}

public Set<Integer> shuttingDownTabletServers() {
return shuttingDownTabletServers;
}

public Set<Integer> liveOrShuttingDownTabletServers() {
return liveTabletServers.keySet();
}

@VisibleForTesting
public void setLiveTabletServers(List<ServerInfo> servers) {
liveTabletServers.clear();
Expand All @@ -136,8 +155,8 @@ public void removeLiveTabletServer(int serverId) {
this.liveTabletServers.remove(serverId);
}

public boolean isReplicaAndServerOnline(int serverId, TableBucket tableBucket) {
return liveTabletServers.containsKey(serverId)
public boolean isReplicaOnline(int serverId, TableBucket tableBucket) {
return liveTabletServerSet().contains(serverId)
&& !replicasOnOffline
.getOrDefault(serverId, Collections.emptySet())
.contains(tableBucket);
Expand Down Expand Up @@ -636,5 +655,6 @@ public void resetContext() {
clearTablesState();
// clear the live tablet servers
liveTabletServers.clear();
shuttingDownTabletServers.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidUpdateVersionException;
import org.apache.fluss.exception.TabletServerNotAvailableException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
Expand All @@ -38,13 +39,15 @@
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
import org.apache.fluss.server.coordinator.event.CoordinatorEventManager;
import org.apache.fluss.server.coordinator.event.CreatePartitionEvent;
Expand Down Expand Up @@ -72,6 +75,7 @@
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
import org.apache.fluss.server.metadata.ServerInfo;
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.BucketAssignment;
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
Expand Down Expand Up @@ -104,6 +108,7 @@

import static org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
import static org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket;
import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
Expand Down Expand Up @@ -529,6 +534,11 @@ public void process(CoordinatorEvent event) {
completeFromCallable(
commitLakeTableSnapshotEvent.getRespCallback(),
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
} else if (event instanceof ControlledShutdownEvent) {
ControlledShutdownEvent controlledShutdownEvent = (ControlledShutdownEvent) event;
completeFromCallable(
controlledShutdownEvent.getRespCallback(),
() -> tryProcessControlledShutdown(controlledShutdownEvent));
} else if (event instanceof AccessContextEvent) {
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
processAccessContext(accessContextEvent);
Expand Down Expand Up @@ -865,6 +875,7 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
LOG.info("Tablet server failure callback for {}.", tabletServerId);
coordinatorContext.removeOfflineBucketInServer(tabletServerId);
coordinatorContext.removeLiveTabletServer(tabletServerId);
coordinatorContext.shuttingDownTabletServers().remove(tabletServerId);
coordinatorChannelManager.removeTabletServer(tabletServerId);

// Here, we will first update alive tabletServer info for all tabletServers and
Expand Down Expand Up @@ -1165,6 +1176,73 @@ private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
return response;
}

private ControlledShutdownResponse tryProcessControlledShutdown(
ControlledShutdownEvent controlledShutdownEvent) {
ControlledShutdownResponse response = new ControlledShutdownResponse();

// TODO here we need to check tabletServerEpoch, avoid to receive controlled shutdown
// request from an old tabletServer. Trace by https://github.com/alibaba/fluss/issues/1153
int tabletServerEpoch = controlledShutdownEvent.getTabletServerEpoch();

int tabletServerId = controlledShutdownEvent.getTabletServerId();
LOG.info(
"Try to process controlled shutdown for tabletServer: {} of tabletServer epoch: {}",
controlledShutdownEvent.getTabletServerId(),
tabletServerEpoch);

if (!coordinatorContext.liveOrShuttingDownTabletServers().contains(tabletServerId)) {
throw new TabletServerNotAvailableException(
"TabletServer" + tabletServerId + " is not available.");
}

coordinatorContext.shuttingDownTabletServers().add(tabletServerId);
LOG.debug(
"All shutting down tabletServers: {}",
coordinatorContext.shuttingDownTabletServers());
LOG.debug("All live tabletServers: {}", coordinatorContext.liveTabletServerSet());

List<TableBucketReplica> replicasToActOn =
coordinatorContext.replicasOnTabletServer(tabletServerId).stream()
.filter(
replica -> {
TableBucket tableBucket = replica.getTableBucket();
return !coordinatorContext.getAssignment(tableBucket).isEmpty()
&& coordinatorContext
.getBucketLeaderAndIsr(tableBucket)
.isPresent()
&& !coordinatorContext.isToBeDeleted(tableBucket);
})
.collect(Collectors.toList());

Set<TableBucket> bucketsLedByServer = new HashSet<>();
Set<TableBucketReplica> replicasFollowedByServer = new HashSet<>();
for (TableBucketReplica replica : replicasToActOn) {
TableBucket tableBucket = replica.getTableBucket();
if (replica.getReplica()
== coordinatorContext.getBucketLeaderAndIsr(tableBucket).get().leader()) {
bucketsLedByServer.add(tableBucket);
} else {
replicasFollowedByServer.add(replica);
}
}

tableBucketStateMachine.handleStateChange(
bucketsLedByServer, OnlineBucket, CONTROLLED_SHUTDOWN_ELECTION);

// TODO need send stop request to the leader?

// If the tabletServer is a follower, updates the isr in ZK and notifies the current leader.
replicaStateMachine.handleStateChanges(replicasFollowedByServer, OfflineReplica);

// Return the list of buckets that are still being managed by the controlled shutdown
// tabletServer after leader migration.
response.addAllRemainingLeaderBuckets(
coordinatorContext.getBucketsWithLeaderIn(tabletServerId).stream()
.map(ServerRpcMessageUtils::fromTableBucket)
.collect(Collectors.toList()));
return response;
}

private void validateFencedEvent(FencedCoordinatorEvent event) {
TableBucket tb = event.getTableBucket();
if (coordinatorContext.getTablePathById(tb.getTableId()) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void addNotifyLeaderRequestForTabletServers(
List<Integer> bucketReplicas,
LeaderAndIsr leaderAndIsr) {
tabletServers.stream()
.filter(s -> s >= 0)
.filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s))
.forEach(
id -> {
Map<TableBucket, PbNotifyLeaderAndIsrReqForBucket>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
import org.apache.fluss.rpc.messages.CreateAclsResponse;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
Expand Down Expand Up @@ -86,6 +88,7 @@
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
import org.apache.fluss.server.coordinator.event.EventManager;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
Expand Down Expand Up @@ -575,6 +578,20 @@ public CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
return CompletableFuture.completedFuture(heartbeatResponse);
}

@Override
public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
ControlledShutdownRequest request) {
CompletableFuture<ControlledShutdownResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
.put(
new ControlledShutdownEvent(
request.getTabletServerId(),
request.getTabletServerEpoch(),
response));
return response;
}

private void validateHeartbeatRequest(
PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) {
if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.coordinator.event;

import org.apache.fluss.rpc.messages.ControlledShutdownResponse;

import java.util.concurrent.CompletableFuture;

/** An event for controlled shutdown of TabletServer. */
public class ControlledShutdownEvent implements CoordinatorEvent {
private final int tabletServerId;
private final int tabletServerEpoch;
private final CompletableFuture<ControlledShutdownResponse> respCallback;

public ControlledShutdownEvent(
int tabletServerId,
int tabletServerEpoch,
CompletableFuture<ControlledShutdownResponse> respCallback) {
this.tabletServerId = tabletServerId;
this.tabletServerEpoch = tabletServerEpoch;
this.respCallback = respCallback;
}

public int getTabletServerId() {
return tabletServerId;
}

public int getTabletServerEpoch() {
return tabletServerEpoch;
}

public CompletableFuture<ControlledShutdownResponse> getRespCallback() {
return respCallback;
}
}
Loading