diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java new file mode 100644 index 0000000000..320daeff64 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown when the tabletServer is not available. + * + * @since 0.8 + */ +@PublicEvolving +public class TabletServerNotAvailableException extends ApiException { + public TabletServerNotAvailableException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java index a089a324e1..a78c9bdbdb 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java @@ -26,6 +26,8 @@ import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; +import org.apache.fluss.rpc.messages.ControlledShutdownRequest; +import org.apache.fluss.rpc.messages.ControlledShutdownResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; import org.apache.fluss.rpc.protocol.ApiKeys; @@ -78,4 +80,9 @@ CompletableFuture commitLakeTableSnapshot( @RPC(api = ApiKeys.LAKE_TIERING_HEARTBEAT) CompletableFuture lakeTieringHeartbeat( LakeTieringHeartbeatRequest request); + + /** Try to controlled shutdown for tabletServer with specify tabletServerId. */ + @RPC(api = ApiKeys.CONTROLLED_SHUTDOWN) + CompletableFuture controlledShutdown( + ControlledShutdownRequest request); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 0bc2d494a8..8526581aef 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -70,7 +70,8 @@ public enum ApiKeys { CREATE_ACLS(1039, 0, 0, PUBLIC), LIST_ACLS(1040, 0, 0, PUBLIC), DROP_ACLS(1041, 0, 0, PUBLIC), - LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE); + LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE), + CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 19e4010955..200b8b520b 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -532,6 +532,14 @@ message LakeTieringHeartbeatResponse { repeated PbHeartbeatRespForTable failed_table_resp = 5; } +message ControlledShutdownRequest { + required int32 tablet_server_id = 1; + required int32 tablet_server_epoch = 2; +} + +message ControlledShutdownResponse { + repeated PbTableBucket remaining_leader_buckets = 1; +} // --------------- Inner classes ---------------- message PbApiVersion { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java index 99c1e105fb..4acbafd099 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java @@ -115,6 +115,7 @@ protected static void startServer(ServerBase server) { public void start() throws Exception { try { addShutDownHook(); + // at first, we need to initialize the file system pluginManager = PluginUtils.createPluginManagerFromRootFolder(conf); FileSystem.initialize(conf, pluginManager); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index aa372f56a4..4cb9889673 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -67,6 +67,7 @@ public class CoordinatorContext { private final Map failDeleteNumbers = new HashMap<>(); private final Map liveTabletServers = new HashMap<>(); + private final Set shuttingDownTabletServers = new HashSet<>(); // a map from the table bucket to the state of the bucket. private final Map bucketStates = new HashMap<>(); @@ -114,6 +115,24 @@ public Map getLiveTabletServers() { return liveTabletServers; } + public Set liveTabletServerSet() { + Set liveTabletServers = new HashSet<>(); + for (Integer brokerId : this.liveTabletServers.keySet()) { + if (!shuttingDownTabletServers.contains(brokerId)) { + liveTabletServers.add(brokerId); + } + } + return liveTabletServers; + } + + public Set shuttingDownTabletServers() { + return shuttingDownTabletServers; + } + + public Set liveOrShuttingDownTabletServers() { + return liveTabletServers.keySet(); + } + @VisibleForTesting public void setLiveTabletServers(List servers) { liveTabletServers.clear(); @@ -136,8 +155,8 @@ public void removeLiveTabletServer(int serverId) { this.liveTabletServers.remove(serverId); } - public boolean isReplicaAndServerOnline(int serverId, TableBucket tableBucket) { - return liveTabletServers.containsKey(serverId) + public boolean isReplicaOnline(int serverId, TableBucket tableBucket) { + return liveTabletServerSet().contains(serverId) && !replicasOnOffline .getOrDefault(serverId, Collections.emptySet()) .contains(tableBucket); @@ -636,5 +655,6 @@ public void resetContext() { clearTablesState(); // clear the live tablet servers liveTabletServers.clear(); + shuttingDownTabletServers.clear(); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 66d52d9fc4..bab477d207 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidUpdateVersionException; +import org.apache.fluss.exception.TabletServerNotAvailableException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; @@ -38,6 +39,7 @@ import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; +import org.apache.fluss.rpc.messages.ControlledShutdownResponse; import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.server.coordinator.event.AccessContextEvent; @@ -45,6 +47,7 @@ import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; +import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.coordinator.event.CreatePartitionEvent; @@ -72,6 +75,7 @@ import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.utils.ServerRpcMessageUtils; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.LakeTableSnapshot; @@ -104,6 +108,7 @@ import static org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket; import static org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION; import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica; import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica; import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted; @@ -529,6 +534,11 @@ public void process(CoordinatorEvent event) { completeFromCallable( commitLakeTableSnapshotEvent.getRespCallback(), () -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent)); + } else if (event instanceof ControlledShutdownEvent) { + ControlledShutdownEvent controlledShutdownEvent = (ControlledShutdownEvent) event; + completeFromCallable( + controlledShutdownEvent.getRespCallback(), + () -> tryProcessControlledShutdown(controlledShutdownEvent)); } else if (event instanceof AccessContextEvent) { AccessContextEvent accessContextEvent = (AccessContextEvent) event; processAccessContext(accessContextEvent); @@ -865,6 +875,7 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent LOG.info("Tablet server failure callback for {}.", tabletServerId); coordinatorContext.removeOfflineBucketInServer(tabletServerId); coordinatorContext.removeLiveTabletServer(tabletServerId); + coordinatorContext.shuttingDownTabletServers().remove(tabletServerId); coordinatorChannelManager.removeTabletServer(tabletServerId); // Here, we will first update alive tabletServer info for all tabletServers and @@ -1165,6 +1176,73 @@ private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot( return response; } + private ControlledShutdownResponse tryProcessControlledShutdown( + ControlledShutdownEvent controlledShutdownEvent) { + ControlledShutdownResponse response = new ControlledShutdownResponse(); + + // TODO here we need to check tabletServerEpoch, avoid to receive controlled shutdown + // request from an old tabletServer. Trace by https://github.com/alibaba/fluss/issues/1153 + int tabletServerEpoch = controlledShutdownEvent.getTabletServerEpoch(); + + int tabletServerId = controlledShutdownEvent.getTabletServerId(); + LOG.info( + "Try to process controlled shutdown for tabletServer: {} of tabletServer epoch: {}", + controlledShutdownEvent.getTabletServerId(), + tabletServerEpoch); + + if (!coordinatorContext.liveOrShuttingDownTabletServers().contains(tabletServerId)) { + throw new TabletServerNotAvailableException( + "TabletServer" + tabletServerId + " is not available."); + } + + coordinatorContext.shuttingDownTabletServers().add(tabletServerId); + LOG.debug( + "All shutting down tabletServers: {}", + coordinatorContext.shuttingDownTabletServers()); + LOG.debug("All live tabletServers: {}", coordinatorContext.liveTabletServerSet()); + + List replicasToActOn = + coordinatorContext.replicasOnTabletServer(tabletServerId).stream() + .filter( + replica -> { + TableBucket tableBucket = replica.getTableBucket(); + return !coordinatorContext.getAssignment(tableBucket).isEmpty() + && coordinatorContext + .getBucketLeaderAndIsr(tableBucket) + .isPresent() + && !coordinatorContext.isToBeDeleted(tableBucket); + }) + .collect(Collectors.toList()); + + Set bucketsLedByServer = new HashSet<>(); + Set replicasFollowedByServer = new HashSet<>(); + for (TableBucketReplica replica : replicasToActOn) { + TableBucket tableBucket = replica.getTableBucket(); + if (replica.getReplica() + == coordinatorContext.getBucketLeaderAndIsr(tableBucket).get().leader()) { + bucketsLedByServer.add(tableBucket); + } else { + replicasFollowedByServer.add(replica); + } + } + + tableBucketStateMachine.handleStateChange( + bucketsLedByServer, OnlineBucket, CONTROLLED_SHUTDOWN_ELECTION); + + // TODO need send stop request to the leader? + + // If the tabletServer is a follower, updates the isr in ZK and notifies the current leader. + replicaStateMachine.handleStateChanges(replicasFollowedByServer, OfflineReplica); + + // Return the list of buckets that are still being managed by the controlled shutdown + // tabletServer after leader migration. + response.addAllRemainingLeaderBuckets( + coordinatorContext.getBucketsWithLeaderIn(tabletServerId).stream() + .map(ServerRpcMessageUtils::fromTableBucket) + .collect(Collectors.toList())); + return response; + } + private void validateFencedEvent(FencedCoordinatorEvent event) { TableBucket tb = event.getTableBucket(); if (coordinatorContext.getTablePathById(tb.getTableId()) == null) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 504b2b4396..de6778d35c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -211,7 +211,7 @@ public void addNotifyLeaderRequestForTabletServers( List bucketReplicas, LeaderAndIsr leaderAndIsr) { tabletServers.stream() - .filter(s -> s >= 0) + .filter(s -> s >= 0 && !coordinatorContext.shuttingDownTabletServers().contains(s)) .forEach( id -> { Map diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 5f87937a29..dd3eb62abc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -49,6 +49,8 @@ import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; +import org.apache.fluss.rpc.messages.ControlledShutdownRequest; +import org.apache.fluss.rpc.messages.ControlledShutdownResponse; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateAclsResponse; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; @@ -86,6 +88,7 @@ import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; +import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent; import org.apache.fluss.server.coordinator.event.EventManager; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.LakeTieringTableInfo; @@ -575,6 +578,20 @@ public CompletableFuture lakeTieringHeartbeat( return CompletableFuture.completedFuture(heartbeatResponse); } + @Override + public CompletableFuture controlledShutdown( + ControlledShutdownRequest request) { + CompletableFuture response = new CompletableFuture<>(); + eventManagerSupplier + .get() + .put( + new ControlledShutdownEvent( + request.getTabletServerId(), + request.getTabletServerEpoch(), + response)); + return response; + } + private void validateHeartbeatRequest( PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) { if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java new file mode 100644 index 0000000000..7f93d08054 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.rpc.messages.ControlledShutdownResponse; + +import java.util.concurrent.CompletableFuture; + +/** An event for controlled shutdown of TabletServer. */ +public class ControlledShutdownEvent implements CoordinatorEvent { + private final int tabletServerId; + private final int tabletServerEpoch; + private final CompletableFuture respCallback; + + public ControlledShutdownEvent( + int tabletServerId, + int tabletServerEpoch, + CompletableFuture respCallback) { + this.tabletServerId = tabletServerId; + this.tabletServerEpoch = tabletServerEpoch; + this.respCallback = respCallback; + } + + public int getTabletServerId() { + return tabletServerId; + } + + public int getTabletServerEpoch() { + return tabletServerEpoch; + } + + public CompletableFuture getRespCallback() { + return respCallback; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java index d0f6b1835d..c7c1aa07a4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java @@ -17,21 +17,100 @@ package org.apache.fluss.server.coordinator.statemachine; +import org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine.ElectionResult; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** The algorithms to elect the replica leader. */ public class ReplicaLeaderElectionAlgorithms { - public static Optional defaultReplicaLeaderElection( - List assignments, List aliveReplicas, List isr) { + + /** + * Init replica leader election when the bucket is new created. + * + * @param assignments the assignments + * @param aliveReplicas the alive replicas + * @param coordinatorEpoch the coordinator epoch + * @return the election result + */ + public static Optional initReplicaLeaderElection( + List assignments, List aliveReplicas, int coordinatorEpoch) { + // currently, we always use the first replica in assignment, which also in aliveReplicas and + // isr as the leader replica. + for (int assignment : assignments) { + if (aliveReplicas.contains(assignment)) { + return Optional.of( + new ElectionResult( + aliveReplicas, + new LeaderAndIsr( + assignment, 0, aliveReplicas, coordinatorEpoch, 0))); + } + } + + return Optional.empty(); + } + + /** + * Default replica leader election, like electing leader while leader offline. + * + * @param assignments the assignments + * @param aliveReplicas the alive replicas + * @param leaderAndIsr the original leaderAndIsr + * @return the election result + */ + public static Optional defaultReplicaLeaderElection( + List assignments, List aliveReplicas, LeaderAndIsr leaderAndIsr) { // currently, we always use the first replica in assignment, which also in aliveReplicas and // isr as the leader replica. + List isr = leaderAndIsr.isr(); for (int assignment : assignments) { if (aliveReplicas.contains(assignment) && isr.contains(assignment)) { - return Optional.of(assignment); + return Optional.of( + new ElectionResult( + aliveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr))); } } return Optional.empty(); } + + /** + * Controlled shutdown replica leader election. + * + * @param assignments the assignments + * @param aliveReplicas the alive replicas + * @param leaderAndIsr the original leaderAndIsr + * @param shutdownTabletServers the shutdown tabletServers + * @return the election result + */ + public static Optional controlledShutdownReplicaLeaderElection( + List assignments, + List aliveReplicas, + LeaderAndIsr leaderAndIsr, + Set shutdownTabletServers) { + List originIsr = leaderAndIsr.isr(); + Set isrSet = new HashSet<>(originIsr); + for (Integer id : assignments) { + if (aliveReplicas.contains(id) + && isrSet.contains(id) + && !shutdownTabletServers.contains(id)) { + Set newAliveReplicas = new HashSet<>(aliveReplicas); + newAliveReplicas.removeAll(shutdownTabletServers); + List newIsr = + originIsr.stream() + .filter(replica -> !shutdownTabletServers.contains(replica)) + .collect(Collectors.toList()); + return Optional.of( + new ElectionResult( + new ArrayList<>(newAliveReplicas), + leaderAndIsr.newLeaderAndIsr(id, newIsr))); + } + } + return Optional.empty(); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java new file mode 100644 index 0000000000..faff47a42d --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.statemachine; + +/** The strategies to elect the replica leader. */ +public enum ReplicaLeaderElectionStrategy { + DEFAULT_ELECTION, + CONTROLLED_SHUTDOWN_ELECTION +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java index be77a43e51..235f4151d8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java @@ -107,7 +107,7 @@ private Tuple2, Set> initializeRepli for (Integer replica : replicas) { TableBucketReplica tableBucketReplica = new TableBucketReplica(tableBucket, replica); - if (coordinatorContext.isReplicaAndServerOnline(replica, tableBucket)) { + if (coordinatorContext.isReplicaOnline(replica, tableBucket)) { coordinatorContext.putReplicaState( tableBucketReplica, ReplicaState.OnlineReplica); onlineReplicas.add(tableBucketReplica); @@ -419,7 +419,7 @@ private Map doRemoveReplicaFromIsr( TableBucket tableBucket = tableBucketReplica.getTableBucket(); int replicaId = tableBucketReplica.getReplica(); - LeaderAndIsr leaderAndIsr = null; + LeaderAndIsr leaderAndIsr; if (toUpdateLeaderAndIsrList.get(tableBucket) != null) { leaderAndIsr = toUpdateLeaderAndIsrList.get(tableBucket); } else { @@ -451,7 +451,10 @@ private Map doRemoveReplicaFromIsr( : leaderAndIsr.isr().stream() .filter(id -> id != replicaId) .collect(Collectors.toList()); - LeaderAndIsr adjustLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(newLeader, newIsr); + LeaderAndIsr adjustLeaderAndIsr = + newLeader == LeaderAndIsr.NO_LEADER + ? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr) + : leaderAndIsr.newLeaderAndIsr(newIsr); adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr); toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java index d1b9958cb1..33e278f443 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java @@ -40,6 +40,12 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.controlledShutdownReplicaLeaderElection; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.initReplicaLeaderElection; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.DEFAULT_ELECTION; + /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ @@ -85,7 +91,7 @@ private void initializeBucketState() { .map( leaderAndIsr -> { // ONLINE if the leader is alive, otherwise, it's OFFLINE - if (coordinatorContext.isReplicaAndServerOnline( + if (coordinatorContext.isReplicaOnline( leaderAndIsr.leader(), tableBucket)) { return BucketState.OnlineBucket; } else { @@ -115,6 +121,13 @@ public void shutdown() { } public void handleStateChange(Set tableBuckets, BucketState targetState) { + handleStateChange(tableBuckets, targetState, DEFAULT_ELECTION); + } + + public void handleStateChange( + Set tableBuckets, + BucketState targetState, + ReplicaLeaderElectionStrategy replicaLeaderElectionStrategy) { try { coordinatorRequestBatch.newBatch(); @@ -123,7 +136,7 @@ public void handleStateChange(Set tableBuckets, BucketState targetS batchHandleOnlineChangeAndInitLeader(tableBuckets); } else { for (TableBucket tableBucket : tableBuckets) { - doHandleStateChange(tableBucket, targetState); + doHandleStateChange(tableBucket, targetState, replicaLeaderElectionStrategy); } } coordinatorRequestBatch.sendRequestToTabletServers( @@ -175,8 +188,12 @@ public void handleStateChange(Set tableBuckets, BucketState targetS * * @param tableBucket The table bucket that is to do state change * @param targetState the target state that is to change to + * @param replicaLeaderElectionStrategy the strategy to choose a new leader */ - private void doHandleStateChange(TableBucket tableBucket, BucketState targetState) { + private void doHandleStateChange( + TableBucket tableBucket, + BucketState targetState, + ReplicaLeaderElectionStrategy replicaLeaderElectionStrategy) { coordinatorContext.putBucketStateIfNotExists(tableBucket, BucketState.NonExistentBucket); if (!checkValidTableBucketStateChange(tableBucket, targetState)) { return; @@ -224,7 +241,8 @@ private void doHandleStateChange(TableBucket tableBucket, BucketState targetStat // current state is Online or Offline // not new bucket, we then need to update leader/epoch for the bucket Optional optionalElectionResult = - electNewLeaderForTableBuckets(tableBucket); + electNewLeaderForTableBuckets( + tableBucket, replicaLeaderElectionStrategy); if (!optionalElectionResult.isPresent()) { logFailedStateChange(tableBucket, currentState, targetState); } else { @@ -389,10 +407,7 @@ private Optional doInitElectionForBucket( // filter out the live servers List liveServers = assignedServers.stream() - .filter( - (server) -> - coordinatorContext.isReplicaAndServerOnline( - server, tableBucket)) + .filter((server) -> coordinatorContext.isReplicaOnline(server, tableBucket)) .collect(Collectors.toList()); // todo, consider this case, may reassign with other servers? if (liveServers.isEmpty()) { @@ -413,23 +428,16 @@ private Optional doInitElectionForBucket( } // For the case that the table bucket has been initialized, we use all the live assigned // servers as inSyncReplica set. - List isr = liveServers; - Optional leaderOpt = - ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection( - assignedServers, liveServers, isr); - if (!leaderOpt.isPresent()) { + Optional resultOpt = + initReplicaLeaderElection( + assignedServers, liveServers, coordinatorContext.getCoordinatorEpoch()); + if (!resultOpt.isPresent()) { LOG.error( "The leader election for table bucket {} is empty.", stringifyBucket(tableBucket)); return Optional.empty(); } - int leader = leaderOpt.get(); - - // Register the initial leader and isr. - LeaderAndIsr leaderAndIsr = - new LeaderAndIsr(leader, 0, isr, coordinatorContext.getCoordinatorEpoch(), 0); - - return Optional.of(new ElectionResult(liveServers, leaderAndIsr)); + return resultOpt; } private List tryRegisterLeaderAndIsrOneByOne( @@ -449,7 +457,8 @@ private List tryRegisterLeaderAndIsrOneByOne( return registerSuccessList; } - private Optional electNewLeaderForTableBuckets(TableBucket tableBucket) { + private Optional electNewLeaderForTableBuckets( + TableBucket tableBucket, ReplicaLeaderElectionStrategy electionStrategy) { LeaderAndIsr leaderAndIsr; try { leaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket).get(); @@ -469,7 +478,7 @@ private Optional electNewLeaderForTableBuckets(TableBucket table } // re-election Optional optionalElectionResult = - leaderForOffline(tableBucket, leaderAndIsr); + electLeader(tableBucket, leaderAndIsr, electionStrategy); if (!optionalElectionResult.isPresent()) { LOG.error( "The result of elect leader for table bucket {} is empty.", @@ -564,19 +573,24 @@ private String stringifyBucket(TableBucket tableBucket) { } /** - * Elect a new leader for new or offline bucket, it'll always elect one from the live replicas - * in isr set. + * Elect a new leader for bucket, it'll always elect one from the live replicas in isr set. + * + *

The elect cases including: + * + *

    + *
  1. new or offline bucket + *
  2. tabletServer controlled shutdown + *
*/ - private Optional leaderForOffline( - TableBucket tableBucket, LeaderAndIsr leaderAndIsr) { + private Optional electLeader( + TableBucket tableBucket, + LeaderAndIsr leaderAndIsr, + ReplicaLeaderElectionStrategy electionStrategy) { List assignment = coordinatorContext.getAssignment(tableBucket); // filter out the live servers List liveReplicas = assignment.stream() - .filter( - replica -> - coordinatorContext.isReplicaAndServerOnline( - replica, tableBucket)) + .filter(replica -> coordinatorContext.isReplicaOnline(replica, tableBucket)) .collect(Collectors.toList()); // we'd like use the first live replica as the new leader if (liveReplicas.isEmpty()) { @@ -584,29 +598,27 @@ private Optional leaderForOffline( return Optional.empty(); } - Optional leaderOpt = - ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection( - assignment, liveReplicas, leaderAndIsr.isr()); - if (!leaderOpt.isPresent()) { + Optional resultOpt = Optional.empty(); + if (electionStrategy == DEFAULT_ELECTION) { + resultOpt = defaultReplicaLeaderElection(assignment, liveReplicas, leaderAndIsr); + } else if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) { + Set shuttingDownTabletServers = coordinatorContext.shuttingDownTabletServers(); + resultOpt = + controlledShutdownReplicaLeaderElection( + assignment, liveReplicas, leaderAndIsr, shuttingDownTabletServers); + } + + if (!resultOpt.isPresent()) { LOG.error( "The leader election for table bucket {} is empty.", stringifyBucket(tableBucket)); return Optional.empty(); } - - // get the updated leader and isr - LeaderAndIsr newLeaderAndIsr = - new LeaderAndIsr( - leaderOpt.get(), - leaderAndIsr.leaderEpoch() + 1, - leaderAndIsr.isr(), - coordinatorContext.getCoordinatorEpoch(), - leaderAndIsr.bucketEpoch() + 1); - - return Optional.of(new ElectionResult(liveReplicas, newLeaderAndIsr)); + return resultOpt; } - private static class ElectionResult { + /** The result of leader election. */ + public static class ElectionResult { private final List liveReplicas; private final LeaderAndIsr leaderAndIsr; @@ -614,5 +626,13 @@ public ElectionResult(List liveReplicas, LeaderAndIsr leaderAndIsr) { this.liveReplicas = liveReplicas; this.leaderAndIsr = leaderAndIsr; } + + public List getLiveReplicas() { + return liveReplicas; + } + + public LeaderAndIsr getLeaderAndIsr() { + return leaderAndIsr; + } } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 0e593c2c83..63121dbc33 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -24,11 +24,14 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.exception.InvalidServerRackInfoException; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.GatewayClientProxy; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.RpcServer; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.ControlledShutdownRequest; +import org.apache.fluss.rpc.messages.ControlledShutdownResponse; import org.apache.fluss.rpc.metrics.ClientMetricGroup; import org.apache.fluss.rpc.netty.server.RequestsMetrics; import org.apache.fluss.server.ServerBase; @@ -68,6 +71,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucket; /** * Tablet server implementation. The tablet server is responsible to manage the log tablet and kv @@ -79,6 +83,10 @@ public class TabletServer extends ServerBase { private static final Logger LOG = LoggerFactory.getLogger(TabletServer.class); + // TODO, maybe need to make it configurable + private static final int CONTROLLED_SHUTDOWN_MAX_RETRIES = 3; + private static final long CONTROLLED_SHUTDOWN_RETRY_INTERVAL_MS = 1000L; + private final int serverId; /** @@ -144,6 +152,9 @@ public class TabletServer extends ServerBase { @Nullable private Authorizer authorizer; + @GuardedBy("lock") + private CoordinatorGateway coordinatorGateway; + public TabletServer(Configuration conf) { this(conf, SystemClock.getInstance()); } @@ -206,7 +217,7 @@ protected void startServices() throws Exception { new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" + serverId); this.rpcClient = RpcClient.create(conf, clientMetricGroup, true); - CoordinatorGateway coordinatorGateway = + this.coordinatorGateway = GatewayClientProxy.createGatewayProxy( () -> metadataCache.getCoordinatorServer(interListenerName), rpcClient, @@ -261,6 +272,9 @@ protected void startServices() throws Exception { @Override protected CompletableFuture closeAsync(Result result) { if (isShutDown.compareAndSet(false, true)) { + + controlledShutDown(); + CompletableFuture serviceShutdownFuture = stopServices(); serviceShutdownFuture.whenComplete( @@ -408,6 +422,60 @@ CompletableFuture stopServices() { } } + private void controlledShutDown() { + LOG.info("Starting controlled shutdown."); + + // We request the CoordinatorServer to do a controlled shutdown. On failure, we backoff for + // a period of time and try again for a number of retries. If all the attempt fails, we + // simply force the shutdown. + boolean shutdownSucceeded = false; + int remainingRetries = CONTROLLED_SHUTDOWN_MAX_RETRIES; + while (!shutdownSucceeded && remainingRetries > 0) { + remainingRetries--; + + ControlledShutdownRequest controlledShutdownRequest = + new ControlledShutdownRequest() + .setTabletServerId(serverId) + .setTabletServerEpoch(-1); // TODO, set correct tabletServer epoch. + try { + ControlledShutdownResponse response = + coordinatorGateway.controlledShutdown(controlledShutdownRequest).get(); + if (response.getRemainingLeaderBucketsCount() > 0) { + List remainingLeaderBuckets = new ArrayList<>(); + response.getRemainingLeaderBucketsList() + .forEach( + pbTableBucket -> + remainingLeaderBuckets.add( + toTableBucket(pbTableBucket))); + LOG.warn( + "TabletServer {} is still the leader for the following buckets: {} after Controlled Shutdown", + serverId, + remainingLeaderBuckets); + } else { + shutdownSucceeded = true; + } + } catch (Exception e) { + LOG.warn("Failed to do controlled shutdown: {}", e.getMessage()); + // do nothing and retry. + } + + if (!shutdownSucceeded && remainingRetries > 0) { + try { + Thread.sleep(CONTROLLED_SHUTDOWN_RETRY_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + LOG.info("Retrying controlled shutdown ({} retries remaining).", remainingRetries); + } + } + + if (!shutdownSucceeded) { + LOG.warn( + "Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed."); + } + } + @Override protected String getServerName() { return SERVER_NAME; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index 823c2bb529..88126f2669 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -218,6 +218,17 @@ public static TableBucket toTableBucket(PbTableBucket protoTableBucket) { protoTableBucket.getBucketId()); } + public static PbTableBucket fromTableBucket(TableBucket tableBucket) { + PbTableBucket pbTableBucket = + new PbTableBucket() + .setTableId(tableBucket.getTableId()) + .setBucketId(tableBucket.getBucket()); + if (tableBucket.getPartitionId() != null) { + pbTableBucket.setPartitionId(tableBucket.getPartitionId()); + } + return pbTableBucket; + } + public static ServerNode toServerNode(PbServerNode pbServerNode, ServerType serverType) { return new ServerNode( pbServerNode.getNodeId(), diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java index 6b6f8bc7a3..2512481389 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java @@ -70,8 +70,27 @@ public LeaderAndIsr( this.bucketEpoch = bucketEpoch; } + /** + * Create a new LeaderAndIsr with the given leader and isr, which means the leader changes. + * + * @param newLeader the new leader replica id + * @param newIsr the new isr + * @return the new LeaderAndIsr + */ public LeaderAndIsr newLeaderAndIsr(int newLeader, List newIsr) { - return new LeaderAndIsr(newLeader, leaderEpoch, newIsr, coordinatorEpoch, bucketEpoch + 1); + return new LeaderAndIsr( + newLeader, leaderEpoch + 1, newIsr, coordinatorEpoch, bucketEpoch + 1); + } + + /** + * Create a new LeaderAndIsr with the given isr, which means only the isr changes, but the + * leader remains the same. + * + * @param newIsr the new isr + * @return the new LeaderAndIsr + */ + public LeaderAndIsr newLeaderAndIsr(List newIsr) { + return new LeaderAndIsr(leader, leaderEpoch, newIsr, coordinatorEpoch, bucketEpoch + 1); } public int leader() { @@ -98,14 +117,6 @@ public int bucketEpoch() { return bucketEpoch; } - public boolean equalsAllowStalePartitionEpoch(LeaderAndIsr other) { - return leader == other.leader - && leaderEpoch == other.leaderEpoch - && coordinatorEpoch == other.coordinatorEpoch - && isr.equals(other.isr) - && bucketEpoch <= other.bucketEpoch; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 09a833e41c..79047d8b79 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -315,8 +315,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { client.registerTabletServer(newlyServerId, tabletServerRegistration); // retry until the tablet server register event is been handled - retryVerifyContext( - ctx -> assertThat(ctx.getLiveTabletServers()).containsKey(newlyServerId)); + retryVerifyContext(ctx -> assertThat(ctx.liveTabletServerSet()).contains(newlyServerId)); initCoordinatorChannel(); // verify the context has the exact tablet server @@ -360,7 +359,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { // retry until the server has been removed from coordinator context retryVerifyContext( - ctx -> assertThat(ctx.getLiveTabletServers()).doesNotContainKey(newlyServerId)); + ctx -> assertThat(ctx.liveTabletServerSet()).doesNotContain(newlyServerId)); // check replica state // all replicas should be online but the replica in the down server @@ -397,8 +396,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { // assume the server that comes again zookeeperClient.registerTabletServer(newlyServerId, tabletServerRegistration); // retry until the server has been added to coordinator context - retryVerifyContext( - ctx -> assertThat(ctx.getLiveTabletServers()).containsKey(newlyServerId)); + retryVerifyContext(ctx -> assertThat(ctx.liveTabletServerSet()).contains(newlyServerId)); // make sure the bucket that remains in offline should be online again // since the server become online @@ -812,7 +810,6 @@ void testProcessAdjustIsr() throws Exception { assertThat(ctx.getBucketLeaderAndIsr(tableBucket)) .contains( leaderAndIsr.newLeaderAndIsr( - leaderAndIsr.leader(), leaderAndIsr.isr())))); // verify the response diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java index 9d7337ae78..4636a341df 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java @@ -43,7 +43,7 @@ public static void makeSendLeaderAndStopRequestAlwaysSuccess( TestCoordinatorChannelManager testCoordinatorChannelManager) { Map gateways = makeTabletServerGateways( - coordinatorContext.getLiveTabletServers().keySet(), Collections.emptySet()); + coordinatorContext.liveTabletServerSet(), Collections.emptySet()); testCoordinatorChannelManager.setGateways(gateways); } @@ -52,8 +52,7 @@ public static void makeSendLeaderAndStopRequestFailContext( TestCoordinatorChannelManager testCoordinatorChannelManager, Set failServers) { Map gateways = - makeTabletServerGateways( - coordinatorContext.getLiveTabletServers().keySet(), failServers); + makeTabletServerGateways(coordinatorContext.liveTabletServerSet(), failServers); testCoordinatorChannelManager.setGateways(gateways); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index 1f7509738a..38e3f9ea10 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -30,6 +30,8 @@ import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; +import org.apache.fluss.rpc.messages.ControlledShutdownRequest; +import org.apache.fluss.rpc.messages.ControlledShutdownResponse; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateAclsResponse; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; @@ -296,6 +298,12 @@ public CompletableFuture lakeTieringHeartbeat( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture controlledShutdown( + ControlledShutdownRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture listAcls(ListAclsRequest request) { throw new UnsupportedOperationException(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java new file mode 100644 index 0000000000..a530a19293 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.statemachine; + +import org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine.ElectionResult; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.controlledShutdownReplicaLeaderElection; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.initReplicaLeaderElection; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ReplicaLeaderElectionAlgorithms}. */ +public class ReplicaLeaderElectionAlgorithmsTest { + + @Test + void testInitReplicaLeaderElection() { + List assignments = Arrays.asList(2, 4); + List liveReplicas = Collections.singletonList(4); + + Optional leaderElectionResultOpt = + initReplicaLeaderElection(assignments, liveReplicas, 0); + assertThat(leaderElectionResultOpt.isPresent()).isTrue(); + ElectionResult leaderElectionResult = leaderElectionResultOpt.get(); + assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(4); + assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(4); + assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(4); + } + + @Test + void testDefaultReplicaLeaderElection() { + List assignments = Arrays.asList(2, 4); + List liveReplicas = Arrays.asList(2, 4); + LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(4, 0, Arrays.asList(2, 4), 0, 0); + + Optional leaderElectionResultOpt = + defaultReplicaLeaderElection(assignments, liveReplicas, originLeaderAndIsr); + assertThat(leaderElectionResultOpt.isPresent()).isTrue(); + ElectionResult leaderElectionResult = leaderElectionResultOpt.get(); + assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(2, 4); + assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(2); + assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(2, 4); + } + + @Test + void testControlledShutdownReplicaLeaderElection() { + List assignments = Arrays.asList(2, 4); + List liveReplicas = Arrays.asList(2, 4); + LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(2, 0, Arrays.asList(2, 4), 0, 0); + Set shutdownTabletServers = Collections.singleton(2); + + Optional leaderElectionResultOpt = + controlledShutdownReplicaLeaderElection( + assignments, liveReplicas, originLeaderAndIsr, shutdownTabletServers); + assertThat(leaderElectionResultOpt.isPresent()).isTrue(); + ElectionResult leaderElectionResult = leaderElectionResultOpt.get(); + assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(4); + assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(4); + assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(4); + } + + @Test + void testControlledShutdownReplicaLeaderElectionLastIsrShuttingDown() { + List assignments = Arrays.asList(2, 4); + List liveReplicas = Arrays.asList(2, 4); + LeaderAndIsr originLeaderAndIsr = + new LeaderAndIsr(2, 0, Collections.singletonList(2), 0, 0); + Set shutdownTabletServers = Collections.singleton(2); + + Optional leaderElectionResultOpt = + controlledShutdownReplicaLeaderElection( + assignments, liveReplicas, originLeaderAndIsr, shutdownTabletServers); + assertThat(leaderElectionResultOpt).isEmpty(); + } + + @Test + void testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown() { + List assignments = Arrays.asList(2, 4); + List liveReplicas = Arrays.asList(2, 4); + LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(2, 0, Arrays.asList(2, 4), 0, 0); + Set shutdownTabletServers = new HashSet<>(Arrays.asList(2, 4)); + + Optional leaderElectionResultOpt = + controlledShutdownReplicaLeaderElection( + assignments, liveReplicas, originLeaderAndIsr, shutdownTabletServers); + assertThat(leaderElectionResultOpt).isEmpty(); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java index 4c3cf1ab4e..454ec5de4b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java @@ -223,7 +223,7 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { replicaStateMachine.handleStateChanges(replicas, OfflineReplica); leaderAndIsr = coordinatorContext.getBucketLeaderAndIsr(tableBucket).get(); assertThat(leaderAndIsr) - .isEqualTo(new LeaderAndIsr(LeaderAndIsr.NO_LEADER, 0, Arrays.asList(2), 0, 3)); + .isEqualTo(new LeaderAndIsr(LeaderAndIsr.NO_LEADER, 3, Arrays.asList(2), 0, 3)); } @Test @@ -274,7 +274,7 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { assertThat(leaderAndIsr) .isEqualTo( new LeaderAndIsr( - LeaderAndIsr.NO_LEADER, 0, Collections.singletonList(0), 0, 3)); + LeaderAndIsr.NO_LEADER, 1, Collections.singletonList(0), 0, 3)); } private void toReplicaDeletionStartedState( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index 9009765e4f..6cee33fbe8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -53,14 +53,19 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; +import static org.apache.fluss.server.coordinator.CoordinatorTestUtils.createServers; +import static org.apache.fluss.server.coordinator.CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess; import static org.apache.fluss.server.coordinator.statemachine.BucketState.NewBucket; import static org.apache.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket; import static org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket; import static org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket; +import static org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; @@ -121,9 +126,8 @@ void testStartup() throws Exception { coordinatorContext.putTablePath(t1Id, TablePath.of("db1", "t1")); coordinatorContext.putTablePath(t2Id, TablePath.of("db1", "t2")); - coordinatorContext.setLiveTabletServers( - CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 3))); - CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess( + coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0, 1, 3))); + makeSendLeaderAndStopRequestAlwaysSuccess( coordinatorContext, testCoordinatorChannelManager); // set assignments coordinatorContext.updateBucketReplicaAssignment(t1b0, Arrays.asList(0, 1)); @@ -203,9 +207,8 @@ void testStateChangeToOnline() throws Exception { assertThat(coordinatorContext.getBucketState(tableBucket)).isEqualTo(NewBucket); // now, we set 3 live servers - coordinatorContext.setLiveTabletServers( - CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 2))); - CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess( + coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0, 1, 2))); + makeSendLeaderAndStopRequestAlwaysSuccess( coordinatorContext, testCoordinatorChannelManager); // change to online again @@ -217,8 +220,7 @@ void testStateChangeToOnline() throws Exception { // case2: assuming the leader replica fail(we remove it to server list), // we need elect another replica, - coordinatorContext.setLiveTabletServers( - CoordinatorTestUtils.createServers(Arrays.asList(1, 2))); + coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(1, 2))); tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket), OnlineBucket); // check state is online @@ -230,8 +232,7 @@ void testStateChangeToOnline() throws Exception { // case4: the leader replica fail, but non replicas is available coordinatorContext.putBucketState(tableBucket, OfflineBucket); - coordinatorContext.setLiveTabletServers( - CoordinatorTestUtils.createServers(Collections.emptyList())); + coordinatorContext.setLiveTabletServers(createServers(Collections.emptyList())); tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket), OnlineBucket); // the state will still be offline assertThat(coordinatorContext.getBucketState(tableBucket)).isEqualTo(OfflineBucket); @@ -266,8 +267,7 @@ void testStateChangeToOnline() throws Exception { coordinatorContext, coordinatorRequestBatch, zookeeperClient); eventManager.start(); - coordinatorContext.setLiveTabletServers( - CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 2))); + coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0, 1, 2))); CoordinatorTestUtils.makeSendLeaderAndStopRequestFailContext( coordinatorContext, testCoordinatorChannelManager, Sets.newHashSet(0, 2)); // init a table bucket assignment to coordinator context @@ -319,6 +319,61 @@ void testStateChangeForDropTable() { assertThat(coordinatorContext.getBucketState(tableBucket0)).isNull(); } + @Test + void testStateChangeForTabletServerControlledShutdown() { + TableBucketStateMachine tableBucketStateMachine = createTableBucketStateMachine(); + long tableId = 7; + TablePath fakeTablePath = TablePath.of("db1", "t2"); + TableBucket tb = new TableBucket(tableId, 0); + + // init coordinator context. + coordinatorContext.putTableInfo( + TableInfo.of( + fakeTablePath, + tableId, + 0, + DATA1_TABLE_DESCRIPTOR, + System.currentTimeMillis(), + System.currentTimeMillis())); + coordinatorContext.putTablePath(tableId, fakeTablePath); + coordinatorContext.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1, 2)); + coordinatorContext.putBucketState(tb, NewBucket); + + List aliveServers = Arrays.asList(0, 1, 2); + coordinatorContext.setLiveTabletServers(createServers(aliveServers)); + makeSendLeaderAndStopRequestAlwaysSuccess( + coordinatorContext, testCoordinatorChannelManager); + + // check state is online. + tableBucketStateMachine.handleStateChange(Collections.singleton(tb), OnlineBucket); + assertThat(coordinatorContext.getBucketState(tb)).isEqualTo(OnlineBucket); + assertThat(coordinatorContext.liveTabletServerSet()) + .containsExactlyInAnyOrderElementsOf(aliveServers); + assertThat(coordinatorContext.shuttingDownTabletServers()).isEmpty(); + assertThat(coordinatorContext.liveOrShuttingDownTabletServers()) + .containsExactlyInAnyOrderElementsOf(aliveServers); + + int oldLeader = coordinatorContext.getBucketLeaderAndIsr(tb).get().leader(); + aliveServers = + aliveServers.stream().filter(s -> s != oldLeader).collect(Collectors.toList()); + + // trigger controlled shutdown for oldLeader. + coordinatorContext.shuttingDownTabletServers().add(oldLeader); + assertThat(coordinatorContext.liveTabletServerSet()) + .containsExactlyInAnyOrderElementsOf(aliveServers); + assertThat(coordinatorContext.shuttingDownTabletServers()) + .containsExactlyInAnyOrder(oldLeader); + assertThat(coordinatorContext.liveOrShuttingDownTabletServers()) + .containsExactlyInAnyOrder(0, 1, 2); + + // handle state change for controlled shutdown. + tableBucketStateMachine.handleStateChange( + Collections.singleton(tb), OnlineBucket, CONTROLLED_SHUTDOWN_ELECTION); + assertThat(coordinatorContext.getBucketState(tb)).isEqualTo(OnlineBucket); + assertThat(coordinatorContext.getBucketLeaderAndIsr(tb).get().leader()) + .isNotEqualTo(oldLeader); + } + private TableBucketStateMachine createTableBucketStateMachine() { return new TableBucketStateMachine( coordinatorContext, coordinatorRequestBatch, zookeeperClient); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java similarity index 77% rename from fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java rename to fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java index 8388259a58..2102df6d1f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java @@ -51,9 +51,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** The ITCase for tablet server failover. */ -class TabletServerFailOverITCase { - +/** The ITCase for tabletServer shutdown (controlled shutdown). */ +public class TabletServerShutdownITCase { @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = FlussClusterExtension.builder().setNumOfTabletServers(3).build(); @@ -114,7 +113,42 @@ void testIOExceptionShouldStopTabletServer(boolean isLogTable) throws Exception } @Test - void testKillServers() throws Exception { + void testControlledShutdown() throws Exception { + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(Schema.newBuilder().column("a", DataTypes.INT()).build()) + .distributedBy(1) + .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3) + .build(); + TablePath tablePath = TablePath.of("test_shutdown", "test_controlled_shutdown"); + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + TableBucket tb = new TableBucket(tableId, 0); + + LeaderAndIsr leaderAndIsr = FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb); + int leader = leaderAndIsr.leader(); + + // test kill the tabletServers with leader on. + FLUSS_CLUSTER_EXTENSION.stopTabletServer(leader); + ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + + // the leader should be removed from isr, and new leader should be elected. + retry( + Duration.ofMinutes(1), + () -> + assertThat(zkClient.getLeaderAndIsr(tb)) + .map(LeaderAndIsr::leader) + .isNotEqualTo(leader)); + + // restart the shutdown server + FLUSS_CLUSTER_EXTENSION.startTabletServer(leader, true); + } + + @Test + void testControlledShutdownRetriesFailover() throws Exception { + // This case is to test the scenario that the controlled shutdown request is retried and + // failed by cannot elect any new leader. In this case the controlled shutdown will finally + // go uncontrolled shutdown. FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); TableDescriptor tableDescriptor = TableDescriptor.builder() @@ -122,7 +156,7 @@ void testKillServers() throws Exception { .distributedBy(1) .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 2) .build(); - TablePath tablePath = TablePath.of("test_failover", "test_kill_servers"); + TablePath tablePath = TablePath.of("test_failover", "test_controlled_shutdown_failed"); long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); TableBucket tb = new TableBucket(tableId, 0); @@ -132,20 +166,22 @@ void testKillServers() throws Exception { isr.remove(Integer.valueOf(leader)); int follower = isr.get(0); - // let's kil follower + // Let's kil follower. Will go controlled shutdown. FLUSS_CLUSTER_EXTENSION.stopTabletServer(follower); ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); // the follower should be removed from isr LeaderAndIsr expectedLeaderAndIsr1 = - leaderAndIsr.newLeaderAndIsr(leader, Collections.singletonList(leader)); + leaderAndIsr.newLeaderAndIsr(Collections.singletonList(leader)); retry( Duration.ofMinutes(1), () -> assertThat(zkClient.getLeaderAndIsr(tb).get()) .isEqualTo(expectedLeaderAndIsr1)); - // kill the leader again + // kill the leader. As we only have 1 replica, no leader can be elected as we send the + // controlled shutdown request to the leader. So the controlled shutdown will finally go + // uncontrolled shutdown. FLUSS_CLUSTER_EXTENSION.stopTabletServer(leader); // should be no leader