diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 9f4b603e98..3125c806f3 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -338,6 +338,12 @@ public class ConfigOptions { + " (“50100,50101”), ranges (“50100-50200”) or a combination of both." + "This option is deprecated. Please use bind.listeners instead, which provides a more flexible configuration for multiple ports"); + public static final ConfigOption COORDINATOR_ID = + key("coordinator.id") + .intType() + .noDefaultValue() + .withDescription("The id for the coordinator server."); + /** * @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE} * instead. diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java b/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java new file mode 100644 index 0000000000..b6201c6159 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Exception thrown when the Coordinator leader epoch is invalid. */ +public class CoordinatorEpochFencedException extends RuntimeException { + public CoordinatorEpochFencedException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 60eb942e8b..dc60ef6d68 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -36,6 +36,7 @@ public class MetricNames { // metrics for coordinator server // -------------------------------------------------------------------------------------------- public static final String ACTIVE_COORDINATOR_COUNT = "activeCoordinatorCount"; + public static final String ALIVE_COORDINATOR_COUNT = "aliveCoordinatorCount"; public static final String ACTIVE_TABLET_SERVER_COUNT = "activeTabletServerCount"; public static final String OFFLINE_BUCKET_COUNT = "offlineBucketCount"; public static final String TABLE_COUNT = "tableCount"; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index e811e01456..805748f2ac 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -55,6 +55,7 @@ public class CoordinatorContext { private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class); public static final int INITIAL_COORDINATOR_EPOCH = 0; + public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0; // for simplicity, we just use retry time, may consider make it a configurable value // and use combine retry times and retry delay @@ -67,6 +68,7 @@ public class CoordinatorContext { // a success deletion. private final Map failDeleteNumbers = new HashMap<>(); + private final Set liveCoordinatorServers = new HashSet<>(); private final Map liveTabletServers = new HashMap<>(); private final Set shuttingDownTabletServers = new HashSet<>(); @@ -108,6 +110,7 @@ public class CoordinatorContext { private ServerInfo coordinatorServerInfo = null; private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; + private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION; public CoordinatorContext() {} @@ -115,6 +118,32 @@ public int getCoordinatorEpoch() { return coordinatorEpoch; } + public int getCoordinatorEpochZkVersion() { + return coordinatorEpochZkVersion; + } + + public void setCoordinatorEpochAndZkVersion(int newEpoch, int newZkVersion) { + this.coordinatorEpoch = newEpoch; + this.coordinatorEpochZkVersion = newZkVersion; + } + + public Set getLiveCoordinatorServers() { + return liveCoordinatorServers; + } + + public void setLiveCoordinatorServers(Set servers) { + liveCoordinatorServers.clear(); + liveCoordinatorServers.addAll(servers); + } + + public void addLiveCoordinatorServer(int serverId) { + this.liveCoordinatorServers.add(serverId); + } + + public void removeLiveCoordinatorServer(int serverId) { + this.liveCoordinatorServers.remove(serverId); + } + public Map getLiveTabletServers() { return liveTabletServers; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 471c62f7dc..d771b77ae2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -63,18 +63,21 @@ import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.coordinator.event.CreatePartitionEvent; import org.apache.fluss.server.coordinator.event.CreateTableEvent; +import org.apache.fluss.server.coordinator.event.DeadCoordinatorServerEvent; import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent; import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent; import org.apache.fluss.server.coordinator.event.DropPartitionEvent; import org.apache.fluss.server.coordinator.event.DropTableEvent; import org.apache.fluss.server.coordinator.event.EventProcessor; import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent; +import org.apache.fluss.server.coordinator.event.NewCoordinatorServerEvent; import org.apache.fluss.server.coordinator.event.NewTabletServerEvent; import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent; import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent; import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; +import org.apache.fluss.server.coordinator.event.watcher.CoordinatorServerChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; import org.apache.fluss.server.coordinator.statemachine.ReplicaStateMachine; @@ -112,6 +115,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -151,6 +155,7 @@ public class CoordinatorEventProcessor implements EventProcessor { private final LakeTableTieringManager lakeTableTieringManager; private final TableChangeWatcher tableChangeWatcher; private final CoordinatorChannelManager coordinatorChannelManager; + private final CoordinatorServerChangeWatcher coordinatorServerChangeWatcher; private final TabletServerChangeWatcher tabletServerChangeWatcher; private final CoordinatorMetadataCache serverMetadataCache; private final CoordinatorRequestBatch coordinatorRequestBatch; @@ -202,6 +207,8 @@ public CoordinatorEventProcessor( tableBucketStateMachine, new RemoteStorageCleaner(conf, ioExecutor), ioExecutor); + this.coordinatorServerChangeWatcher = + new CoordinatorServerChangeWatcher(zooKeeperClient, coordinatorEventManager); this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager); this.tabletServerChangeWatcher = new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager); @@ -230,6 +237,7 @@ public CoordinatorEventManager getCoordinatorEventManager() { public void startup() { coordinatorContext.setCoordinatorServerInfo(getCoordinatorServerInfo()); // start watchers first so that we won't miss node in zk; + coordinatorServerChangeWatcher.start(); tabletServerChangeWatcher.start(); tableChangeWatcher.start(); LOG.info("Initializing coordinator context."); @@ -267,7 +275,7 @@ public void shutdown() { private ServerInfo getCoordinatorServerInfo() { try { return zooKeeperClient - .getCoordinatorAddress() + .getCoordinatorLeaderAddress() .map( coordinatorAddress -> // TODO we set id to 0 as that CoordinatorServer don't support @@ -295,6 +303,12 @@ public int getCoordinatorEpoch() { private void initCoordinatorContext() throws Exception { long start = System.currentTimeMillis(); + // get all coordinator servers + int[] currentCoordinatorServers = zooKeeperClient.getCoordinatorServerList(); + coordinatorContext.setLiveCoordinatorServers( + Arrays.stream(currentCoordinatorServers).boxed().collect(Collectors.toSet())); + LOG.info("Load coordinator servers success when initializing coordinator context."); + // get all tablet server's int[] currentServers = zooKeeperClient.getSortedTabletServerList(); List tabletServerInfos = new ArrayList<>(); @@ -509,6 +523,7 @@ private void onShutdown() { tableManager.shutdown(); // then stop watchers + coordinatorServerChangeWatcher.stop(); tableChangeWatcher.stop(); tabletServerChangeWatcher.stop(); } @@ -531,6 +546,10 @@ public void process(CoordinatorEvent event) { (NotifyLeaderAndIsrResponseReceivedEvent) event); } else if (event instanceof DeleteReplicaResponseReceivedEvent) { processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event); + } else if (event instanceof NewCoordinatorServerEvent) { + processNewCoordinatorServer((NewCoordinatorServerEvent) event); + } else if (event instanceof DeadCoordinatorServerEvent) { + processDeadCoordinatorServer((DeadCoordinatorServerEvent) event); } else if (event instanceof NewTabletServerEvent) { processNewTabletServer((NewTabletServerEvent) event); } else if (event instanceof DeadTabletServerEvent) { @@ -868,6 +887,29 @@ private void onReplicaBecomeOffline(Set offlineReplicas) { replicaStateMachine.handleStateChanges(offlineReplicas, OfflineReplica); } + private void processNewCoordinatorServer(NewCoordinatorServerEvent newCoordinatorServerEvent) { + int coordinatorServerId = newCoordinatorServerEvent.getServerId(); + if (coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) { + return; + } + + // process new coordinator server + LOG.info("New coordinator server callback for coordinator server {}", coordinatorServerId); + + coordinatorContext.addLiveCoordinatorServer(coordinatorServerId); + } + + private void processDeadCoordinatorServer( + DeadCoordinatorServerEvent deadCoordinatorServerEvent) { + int coordinatorServerId = deadCoordinatorServerEvent.getServerId(); + if (!coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) { + return; + } + // process dead coordinator server + LOG.info("Coordinator server failure callback for {}.", coordinatorServerId); + coordinatorContext.removeLiveCoordinatorServer(coordinatorServerId); + } + private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) { // NOTE: we won't need to detect bounced tablet servers like Kafka as we won't // miss the event of tablet server un-register and register again since we can @@ -890,7 +932,7 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) { // it may happen during coordinator server initiation, the watcher watch a new tablet // server register event and put it to event manager, but after that, the coordinator // server read - // all tablet server nodes registered which contain the tablet server a; in this case, + // all tablet server nodes registered which contain the tablet server; in this case, // we can ignore it. return; } @@ -1139,7 +1181,8 @@ private List tryProcessAdjustIsr( } try { - zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList); + zooKeeperClient.batchUpdateLeaderAndIsr( + newLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion()); newLeaderAndIsrList.forEach( (tableBucket, newLeaderAndIsr) -> result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr))); @@ -1150,7 +1193,10 @@ private List tryProcessAdjustIsr( TableBucket tableBucket = entry.getKey(); LeaderAndIsr newLeaderAndIsr = entry.getValue(); try { - zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr); + zooKeeperClient.updateLeaderAndIsr( + tableBucket, + newLeaderAndIsr, + coordinatorContext.getCoordinatorEpochZkVersion()); } catch (Exception e) { LOG.error("Error when register leader and isr.", e); result.add( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java new file mode 100644 index 0000000000..ca78f6aa8f --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.exception.CoordinatorEpochFencedException; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */ +public class CoordinatorLeaderElection implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class); + + private final int serverId; + private final ZooKeeperClient zkClient; + private final CoordinatorContext coordinatorContext; + private final LeaderLatch leaderLatch; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + + public CoordinatorLeaderElection( + ZooKeeperClient zkClient, int serverId, CoordinatorContext coordinatorContext) { + this.serverId = serverId; + this.zkClient = zkClient; + this.coordinatorContext = coordinatorContext; + this.leaderLatch = + new LeaderLatch( + zkClient.getCuratorClient(), + ZkData.CoordinatorElectionZNode.path(), + String.valueOf(serverId)); + } + + public void startElectLeader(Runnable initLeaderServices) { + leaderLatch.addListener( + new LeaderLatchListener() { + @Override + public void isLeader() { + LOG.info("Coordinator server {} has become the leader.", serverId); + isLeader.set(true); + try { + // to avoid split-brain + Optional optionalEpoch = + zkClient.fenceBecomeCoordinatorLeader(serverId); + if (optionalEpoch.isPresent()) { + coordinatorContext.setCoordinatorEpochAndZkVersion( + optionalEpoch.get(), + coordinatorContext.getCoordinatorEpochZkVersion() + 1); + initLeaderServices.run(); + } else { + throw new CoordinatorEpochFencedException( + "Fenced to become coordinator leader."); + } + } catch (Exception e) { + relinquishLeadership(); + throw new CoordinatorEpochFencedException( + "Fenced to become coordinator leader."); + } + } + + @Override + public void notLeader() { + relinquishLeadership(); + LOG.warn("Coordinator server {} has lost the leadership.", serverId); + isLeader.set(false); + } + }); + + try { + leaderLatch.start(); + LOG.info("Coordinator server {} started leader election.", serverId); + + // todo: Currently, we await the leader latch and do nothing until it becomes leader. + // Later we can make it as a hot backup server to continuously synchronize metadata from + // Zookeeper, which save time from initializing context + // leaderLatch.await(); + + } catch (Exception e) { + LOG.error("Failed to start LeaderLatch for server {}", serverId, e); + throw new RuntimeException("Leader election start failed", e); + } + } + + @Override + public void close() { + LOG.info("Closing LeaderLatch for server {}.", serverId); + if (leaderLatch != null) { + try { + leaderLatch.close(); + } catch (Exception e) { + LOG.error("Failed to close LeaderLatch for server {}.", serverId, e); + } + } + } + + public boolean isLeader() { + return this.isLeader.get(); + } + + private void relinquishLeadership() { + isLeader.set(false); + LOG.info("Coordinator server {} has been fenced.", serverId); + + this.close(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 35527805c0..7a26a866bd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -674,6 +674,7 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { // tablet servers. return makeUpdateMetadataRequest( coordinatorContext.getCoordinatorServerInfo(), + coordinatorContext.getCoordinatorEpoch(), new HashSet<>(coordinatorContext.getLiveTabletServers().values()), tableMetadataList, partitionMetadataList); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index cd546f4479..f3545330c9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -55,7 +55,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.UUID; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -86,7 +86,7 @@ public class CoordinatorServer extends ServerBase { private final AtomicBoolean isShutDown = new AtomicBoolean(false); @GuardedBy("lock") - private String serverId; + private int serverId; @GuardedBy("lock") private MetricRegistry metricRegistry; @@ -140,10 +140,14 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private LakeCatalogDynamicLoader lakeCatalogDynamicLoader; + @GuardedBy("lock") + private CoordinatorLeaderElection coordinatorLeaderElection; + public CoordinatorServer(Configuration conf) { super(conf); validateConfigs(conf); this.terminationFuture = new CompletableFuture<>(); + this.serverId = conf.getInt(ConfigOptions.COORDINATOR_ID); } public static void main(String[] args) { @@ -155,10 +159,38 @@ public static void main(String[] args) { @Override protected void startServices() throws Exception { + this.coordinatorContext = new CoordinatorContext(); + electCoordinatorLeader(); + } + + private void electCoordinatorLeader() throws Exception { + this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); + + // Coordinator Server supports high availability. If 3 coordinator servers are alive, + // one of them will be elected as leader and the other two will be standby. + // When leader fails, one of standby coordinators will be elected as new leader. + registerCoordinatorServer(); + ZooKeeperUtils.registerZookeeperClientReInitSessionListener( + zkClient, this::registerCoordinatorServer, this); + + // standby + this.coordinatorLeaderElection = + new CoordinatorLeaderElection(zkClient, serverId, coordinatorContext); + coordinatorLeaderElection.startElectLeader( + () -> { + try { + startCoordinatorLeaderService(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + protected void startCoordinatorLeaderService() throws Exception { + synchronized (lock) { LOG.info("Initializing Coordinator services."); List endpoints = Endpoint.loadBindEndpoints(conf, ServerType.COORDINATOR); - this.serverId = UUID.randomUUID().toString(); // for metrics this.metricRegistry = MetricRegistry.create(conf, pluginManager); @@ -169,8 +201,6 @@ protected void startServices() throws Exception { endpoints.get(0).getHost(), serverId); - this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); - this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true); this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true); @@ -179,7 +209,6 @@ protected void startServices() throws Exception { dynamicConfigManager.startup(); - this.coordinatorContext = new CoordinatorContext(); this.metadataCache = new CoordinatorMetadataCache(); this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager); @@ -219,11 +248,6 @@ protected void startServices() throws Exception { serverMetricGroup)); rpcServer.start(); - registerCoordinatorLeader(); - // when init session, register coordinator server again - ZooKeeperUtils.registerZookeeperClientReInitSessionListener( - zkClient, this::registerCoordinatorLeader, this); - this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME); this.rpcClient = RpcClient.create(conf, clientMetricGroup, true); @@ -233,6 +257,12 @@ protected void startServices() throws Exception { new AutoPartitionManager(metadataCache, metadataManager, conf); autoPartitionManager.start(); + int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE); + this.ioExecutor = + Executors.newFixedThreadPool( + ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io")); + + registerCoordinatorLeader(); // start coordinator event processor after we register coordinator leader to zk // so that the event processor can get the coordinator leader node from zk during start // up. @@ -275,6 +305,41 @@ protected CompletableFuture closeAsync(Result result) { return terminationFuture; } + private void registerCoordinatorServer() throws Exception { + long startTime = System.currentTimeMillis(); + + // we need to retry to register since although + // zkClient reconnect, the ephemeral node may still exist + // for a while time, retry to wait the ephemeral node removed + // see ZOOKEEPER-2985 + while (true) { + try { + zkClient.registerCoordinatorServer(this.serverId); + break; + } catch (KeeperException.NodeExistsException nodeExistsException) { + long elapsedTime = System.currentTimeMillis() - startTime; + if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) { + LOG.error( + "Coordinator Server register to Zookeeper exceeded total retry time of {} ms. " + + "Aborting registration attempts.", + ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS); + throw nodeExistsException; + } + + LOG.warn( + "Coordinator server already registered in Zookeeper. " + + "retrying register after {} ms....", + ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); + try { + Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + private void registerCoordinatorLeader() throws Exception { long startTime = System.currentTimeMillis(); List bindEndpoints = rpcServer.getBindEndpoints(); @@ -447,6 +512,14 @@ CompletableFuture stopServices() { exception = ExceptionUtils.firstOrSuppressed(t, exception); } + try { + if (coordinatorLeaderElection != null) { + coordinatorLeaderElection.close(); + } + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { if (zkClient != null) { zkClient.close(); @@ -494,6 +567,11 @@ public RpcServer getRpcServer() { return rpcServer; } + @VisibleForTesting + public int getServerId() { + return serverId; + } + @VisibleForTesting public ServerMetadataCache getMetadataCache() { return metadataCache; @@ -509,6 +587,19 @@ public DynamicConfigManager getDynamicConfigManager() { } private static void validateConfigs(Configuration conf) { + Optional serverId = conf.getOptional(ConfigOptions.COORDINATOR_ID); + if (!serverId.isPresent()) { + throw new IllegalConfigurationException( + String.format("Configuration %s must be set.", ConfigOptions.COORDINATOR_ID)); + } + + if (serverId.get() < 0) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be greater than or equal 0.", + ConfigOptions.COORDINATOR_ID.key())); + } + if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) { throw new IllegalConfigurationException( String.format( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 9d5dc34790..272c730f1a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -51,6 +51,7 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import org.apache.fluss.utils.function.RunnableWithException; import org.apache.fluss.utils.function.ThrowingRunnable; @@ -249,7 +250,9 @@ public void completeDeleteTable(long tableId) { // delete bucket assignments node, which will also delete the bucket state node, // so that all the zk nodes related to this table are deleted. rethrowIfIsNotNoNodeException( - () -> zookeeperClient.deleteTableAssignment(tableId), + () -> + zookeeperClient.deleteTableAssignment( + tableId, ZkVersion.MATCH_ANY_VERSION.getVersion()), String.format("Delete tablet assignment meta fail for table %s.", tableId)); } @@ -258,7 +261,9 @@ public void completeDeletePartition(long partitionId) { // delete partition assignments node, which will also delete the bucket state node, // so that all the zk nodes related to this partition are deleted. rethrowIfIsNotNoNodeException( - () -> zookeeperClient.deletePartitionAssignment(partitionId), + () -> + zookeeperClient.deletePartitionAssignment( + partitionId, ZkVersion.MATCH_ANY_VERSION.getVersion()), String.format("Delete tablet assignment meta fail for partition %s.", partitionId)); } @@ -311,7 +316,8 @@ public long createTable( long tableId = zookeeperClient.getTableIdAndIncrement(); if (tableAssignment != null) { // register table assignment - zookeeperClient.registerTableAssignment(tableId, tableAssignment); + zookeeperClient.registerTableAssignment( + tableId, tableAssignment, ZkVersion.MATCH_ANY_VERSION.getVersion()); } // register the table zookeeperClient.registerTable( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java index c32b713593..5f4fc51f5e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java @@ -62,6 +62,7 @@ public final class CoordinatorEventManager implements EventManager { private Histogram eventQueueTime; // Coordinator metrics moved from CoordinatorEventProcessor + private volatile int aliveCoordinatorServerCount; private volatile int tabletServerCount; private volatile int offlineBucketCount; private volatile int tableCount; @@ -87,6 +88,8 @@ private void registerMetrics() { // Register coordinator metrics coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () -> 1); + coordinatorMetricGroup.gauge( + MetricNames.ALIVE_COORDINATOR_COUNT, () -> aliveCoordinatorServerCount); coordinatorMetricGroup.gauge( MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> tabletServerCount); coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> offlineBucketCount); @@ -103,6 +106,7 @@ private void updateMetricsViaAccessContext() { AccessContextEvent accessContextEvent = new AccessContextEvent<>( context -> { + int coordinatorServerCount = context.getLiveCoordinatorServers().size(); int tabletServerCount = context.getLiveTabletServers().size(); int tableCount = context.allTables().size(); int bucketCount = context.bucketLeaderAndIsr().size(); @@ -135,6 +139,7 @@ private void updateMetricsViaAccessContext() { } return new MetricsData( + coordinatorServerCount, tabletServerCount, tableCount, bucketCount, @@ -148,6 +153,7 @@ private void updateMetricsViaAccessContext() { // Wait for the result and update local metrics try { MetricsData metricsData = accessContextEvent.getResultFuture().get(); + this.aliveCoordinatorServerCount = metricsData.coordinatorServerCount; this.tabletServerCount = metricsData.tabletServerCount; this.tableCount = metricsData.tableCount; this.bucketCount = metricsData.bucketCount; @@ -270,6 +276,7 @@ public QueuedEvent(CoordinatorEvent event, long enqueueTimeMs) { } private static class MetricsData { + private final int coordinatorServerCount; private final int tabletServerCount; private final int tableCount; private final int bucketCount; @@ -278,12 +285,14 @@ private static class MetricsData { private final int replicasToDeleteCount; public MetricsData( + int coordinatorServerCount, int tabletServerCount, int tableCount, int bucketCount, int partitionCount, int offlineBucketCount, int replicasToDeleteCount) { + this.coordinatorServerCount = coordinatorServerCount; this.tabletServerCount = tabletServerCount; this.tableCount = tableCount; this.bucketCount = bucketCount; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorServerEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorServerEvent.java new file mode 100644 index 0000000000..2ffc0110cc --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorServerEvent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import java.util.Objects; + +/** An event for coordinator server became dead. */ +public class DeadCoordinatorServerEvent implements CoordinatorEvent { + + private final int serverId; + + public DeadCoordinatorServerEvent(int serverId) { + this.serverId = serverId; + } + + public int getServerId() { + return serverId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DeadCoordinatorServerEvent that = (DeadCoordinatorServerEvent) o; + return serverId == that.serverId; + } + + @Override + public int hashCode() { + return Objects.hash(serverId); + } + + @Override + public String toString() { + return "DeadCoordinatorServerEvent{" + "serverId=" + serverId + '}'; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorServerEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorServerEvent.java new file mode 100644 index 0000000000..fd53bc412d --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorServerEvent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import java.util.Objects; + +/** An event for new coordinator server. */ +public class NewCoordinatorServerEvent implements CoordinatorEvent { + + private final int serverId; + + public NewCoordinatorServerEvent(int serverId) { + this.serverId = serverId; + } + + public int getServerId() { + return serverId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NewCoordinatorServerEvent that = (NewCoordinatorServerEvent) o; + return serverId == that.serverId; + } + + @Override + public int hashCode() { + return Objects.hash(serverId); + } + + @Override + public String toString() { + return "NewCoordinatorServerEvent{" + "serverId=" + serverId + '}'; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcher.java new file mode 100644 index 0000000000..f0008d5186 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcher.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.server.coordinator.event.DeadCoordinatorServerEvent; +import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.NewCoordinatorServerEvent; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A watcher to watch the coordinator server changes(new/delete) in zookeeper. */ +public class CoordinatorServerChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServerChangeWatcher.class); + private final CuratorCache curatorCache; + + private volatile boolean running; + + private final EventManager eventManager; + + public CoordinatorServerChangeWatcher( + ZooKeeperClient zooKeeperClient, EventManager eventManager) { + this.curatorCache = + CuratorCache.build( + zooKeeperClient.getCuratorClient(), ZkData.CoordinatorIdsZNode.path()); + this.eventManager = eventManager; + this.curatorCache.listenable().addListener(new CoordinatorServerChangeListener()); + } + + public void start() { + running = true; + curatorCache.start(); + } + + public void stop() { + if (!running) { + return; + } + running = false; + LOG.info("Stopping CoordinatorServerChangeWatcher"); + curatorCache.close(); + } + + private final class CoordinatorServerChangeListener implements CuratorCacheListener { + + @Override + public void event(Type type, ChildData oldData, ChildData newData) { + if (newData != null) { + LOG.debug("Received {} event (path: {})", type, newData.getPath()); + } else { + LOG.debug("Received {} event", type); + } + + switch (type) { + case NODE_CREATED: + { + if (newData != null && newData.getData().length > 0) { + int serverId = getServerIdFromEvent(newData); + LOG.info("Received CHILD_ADDED event for server {}.", serverId); + eventManager.put(new NewCoordinatorServerEvent(serverId)); + } + break; + } + case NODE_DELETED: + { + if (oldData != null && oldData.getData().length > 0) { + int serverId = getServerIdFromEvent(oldData); + LOG.info("Received CHILD_REMOVED event for server {}.", serverId); + eventManager.put(new DeadCoordinatorServerEvent(serverId)); + } + break; + } + default: + break; + } + } + } + + private int getServerIdFromEvent(ChildData data) { + try { + return Integer.parseInt(ZKPaths.getNodeFromPath(data.getPath())); + } catch (NumberFormatException e) { + throw new FlussRuntimeException( + "Invalid server id in zookeeper path: " + data.getPath(), e); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java index a8f4dcb153..0bee350eff 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java @@ -62,7 +62,7 @@ public void stop() { return; } running = false; - LOG.info("Stopping TableChangeWatcher"); + LOG.info("Stopping TabletServerChangeWatcher"); curatorCache.close(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java index 7494ab203e..f127ad727e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java @@ -467,7 +467,8 @@ private Map doRemoveReplicaFromIsr( toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr); } try { - zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList); + zooKeeperClient.batchUpdateLeaderAndIsr( + toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion()); toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr); return adjustedLeaderAndIsr; } catch (Exception e) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java index 224ba5db87..f8939998eb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java @@ -314,7 +314,10 @@ private Optional initLeaderForTableBuckets( ElectionResult electionResult = optionalElectionResult.get(); LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr; try { - zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zooKeeperClient.registerLeaderAndIsr( + tableBucket, + leaderAndIsr, + coordinatorContext.getCoordinatorEpochZkVersion()); } catch (Exception e) { LOG.error( "Fail to create state node for table bucket {} in zookeeper.", @@ -382,7 +385,8 @@ public void batchHandleOnlineChangeAndInitLeader(Set tableBuckets) if (!tableBucketLeadAndIsrInfos.isEmpty()) { try { zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition( - tableBucketLeadAndIsrInfos); + tableBucketLeadAndIsrInfos, + coordinatorContext.getCoordinatorEpochZkVersion()); registerSuccessList.addAll(tableBucketLeadAndIsrInfos); } catch (Exception e) { LOG.error( @@ -457,7 +461,10 @@ private List tryRegisterLeaderAndIsrOneByOne( List registerSuccessList = new ArrayList<>(); for (RegisterTableBucketLeadAndIsrInfo info : registerList) { try { - zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr()); + zooKeeperClient.registerLeaderAndIsr( + info.getTableBucket(), + info.getLeaderAndIsr(), + coordinatorContext.getCoordinatorEpochZkVersion()); registerSuccessList.add(info); } catch (Exception e) { LOG.error( @@ -499,7 +506,10 @@ private Optional electNewLeaderForTableBuckets( } ElectionResult electionResult = optionalElectionResult.get(); try { - zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr); + zooKeeperClient.updateLeaderAndIsr( + tableBucket, + electionResult.leaderAndIsr, + coordinatorContext.getCoordinatorEpochZkVersion()); } catch (Exception e) { LOG.error( "Fail to update bucket LeaderAndIsr for table bucket {}.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java index 770a900014..2f013a7312 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/ServerMetricUtils.java @@ -72,7 +72,7 @@ public class ServerMetricUtils { @VisibleForTesting static final String METRIC_GROUP_MEMORY = "memory"; public static CoordinatorMetricGroup createCoordinatorGroup( - MetricRegistry registry, String clusterId, String hostname, String serverId) { + MetricRegistry registry, String clusterId, String hostname, int serverId) { CoordinatorMetricGroup coordinatorMetricGroup = new CoordinatorMetricGroup(registry, clusterId, hostname, serverId); createAndInitializeStatusMetricGroup(coordinatorMetricGroup); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java index 4045fbae78..5425c16788 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java @@ -47,13 +47,13 @@ public class CoordinatorMetricGroup extends AbstractMetricGroup { protected final String clusterId; protected final String hostname; - protected final String serverId; + protected final int serverId; private final Map, CoordinatorEventMetricGroup> eventMetricGroups = MapUtils.newConcurrentHashMap(); public CoordinatorMetricGroup( - MetricRegistry registry, String clusterId, String hostname, String serverId) { + MetricRegistry registry, String clusterId, String hostname, int serverId) { super(registry, new String[] {clusterId, hostname, NAME}, null); this.clusterId = clusterId; this.hostname = hostname; @@ -69,7 +69,7 @@ protected String getGroupName(CharacterFilter filter) { protected final void putVariables(Map variables) { variables.put("cluster_id", clusterId); variables.put("host", hostname); - variables.put("server_id", serverId); + variables.put("server_id", String.valueOf(serverId)); } public CoordinatorEventMetricGroup getOrAddEventTypeMetricGroup( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index 8ee46545b0..f459ee6ed8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -399,6 +399,7 @@ public static MetadataResponse buildMetadataResponse( public static UpdateMetadataRequest makeUpdateMetadataRequest( @Nullable ServerInfo coordinatorServer, + @Nullable Integer coordinatorEpoch, Set aliveTableServers, List tableMetadataList, List partitionMetadataList) { @@ -441,6 +442,9 @@ public static UpdateMetadataRequest makeUpdateMetadataRequest( updateMetadataRequest.addAllTableMetadatas(pbTableMetadataList); updateMetadataRequest.addAllPartitionMetadatas(pbPartitionMetadataList); + if (coordinatorEpoch != null) { + updateMetadataRequest.setCoordinatorEpoch(coordinatorEpoch); + } return updateMetadataRequest; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java new file mode 100644 index 0000000000..b198f30f08 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk; + +/** Class for coordinator epoch and coordinator epoch zk version. */ +public class ZkEpoch { + private final int coordinatorEpoch; + private final int coordinatorEpochZkVersion; + + public ZkEpoch(int coordinatorEpoch, int coordinatorEpochZkVersion) { + this.coordinatorEpoch = coordinatorEpoch; + this.coordinatorEpochZkVersion = coordinatorEpochZkVersion; + } + + public int getCoordinatorEpoch() { + return coordinatorEpoch; + } + + public int getCoordinatorEpochZkVersion() { + return coordinatorEpochZkVersion; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 40bffe8171..7e667567d0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -31,6 +31,7 @@ import org.apache.fluss.security.acl.Resource; import org.apache.fluss.security.acl.ResourceType; import org.apache.fluss.server.authorizer.DefaultAuthorizer.VersionedAcls; +import org.apache.fluss.server.coordinator.CoordinatorContext; import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo; import org.apache.fluss.server.metadata.BucketMetadata; import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetChildrenRequest; @@ -56,7 +57,6 @@ import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotIdZNode; import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotsZNode; import org.apache.fluss.server.zk.data.ZkData.ConfigEntityZNode; -import org.apache.fluss.server.zk.data.ZkData.CoordinatorZNode; import org.apache.fluss.server.zk.data.ZkData.DatabaseZNode; import org.apache.fluss.server.zk.data.ZkData.DatabasesZNode; import org.apache.fluss.server.zk.data.ZkData.LakeTableZNode; @@ -77,6 +77,7 @@ import org.apache.fluss.server.zk.data.ZkData.TableZNode; import org.apache.fluss.server.zk.data.ZkData.TablesZNode; import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; @@ -116,11 +117,19 @@ import static java.util.stream.Collectors.toMap; import static org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName; +import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** * This class includes methods for write/read various metadata (leader address, tablet server * registration, table assignment, table, schema) in Zookeeper. + * + *

In some method, 'expectedZkVersion' is used to execute an epoch Zookeeper version check. We + * have the following principals to judge if it's necessary to execute epoch Zookeeper version + * check. If all condition met, we need to execute epoch Zookeeper version check. 1. The method + * create/modify/delete Zk node. 2. It's executed by coordinator server. 3. It is about + * metadata(table/partition/leaderAndIsr) rather than server info or ACL info. 4. The Zk node is + * persistent rather than ephemeral. */ @Internal public class ZooKeeperClient implements AutoCloseable { @@ -133,6 +142,7 @@ public class ZooKeeperClient implements AutoCloseable { private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private final CuratorFramework zkClient; + private final ZooKeeperOp zkOp; private final ZkSequenceIDCounter tableIdCounter; private final ZkSequenceIDCounter partitionIdCounter; private final ZkSequenceIDCounter writerIdCounter; @@ -145,6 +155,7 @@ public ZooKeeperClient( Configuration configuration) { this.curatorFrameworkWrapper = curatorFrameworkWrapper; this.zkClient = curatorFrameworkWrapper.asCuratorFramework(); + this.zkOp = new ZooKeeperOp(zkClient); this.tableIdCounter = new ZkSequenceIDCounter(zkClient, TableSequenceIdZNode.path()); this.partitionIdCounter = new ZkSequenceIDCounter(zkClient, PartitionSequenceIdZNode.path()); @@ -168,20 +179,104 @@ public Optional getOrEmpty(String path) throws Exception { // Coordinator server // -------------------------------------------------------------------------------------------- - /** Register a coordinator leader server to ZK. */ + /** Register a coordinator server to ZK. */ + public void registerCoordinatorServer(int coordinatorId) throws Exception { + String path = ZkData.CoordinatorIdZNode.path(coordinatorId); + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); + LOG.info("Registered Coordinator server {} at path {}.", coordinatorId, path); + } + + /** + * Become coordinator leader. This method is a step after electCoordinatorLeader() and before + * registerCoordinatorLeader(). This is to ensure the coordinator get and update the coordinator + * epoch and coordinator epoch zk version. + */ + public Optional fenceBecomeCoordinatorLeader(int coordinatorId) throws Exception { + try { + ensureEpochZnodeExists(); + + try { + ZkEpoch getEpoch = getCurrentEpoch(); + int currentEpoch = getEpoch.getCoordinatorEpoch(); + int currentVersion = getEpoch.getCoordinatorEpochZkVersion(); + int newEpoch = currentEpoch + 1; + LOG.info( + "Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}", + coordinatorId, + currentEpoch, + currentVersion, + newEpoch); + + // atomically update epoch + zkClient.setData() + .withVersion(currentVersion) + .forPath( + ZkData.CoordinatorEpochZNode.path(), + ZkData.CoordinatorEpochZNode.encode(newEpoch)); + + return Optional.of(newEpoch); + } catch (KeeperException.BadVersionException e) { + // Other coordinator leader has updated epoch. + // If this happens, it means our fence is in effect. + LOG.info("Coordinator leader {} failed to update epoch.", coordinatorId); + } + } catch (KeeperException.NodeExistsException e) { + } + return Optional.empty(); + } + + /** Register a coordinator leader to ZK. */ public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) throws Exception { - String path = CoordinatorZNode.path(); + String path = ZkData.CoordinatorLeaderZNode.path(); zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) - .forPath(path, CoordinatorZNode.encode(coordinatorAddress)); - LOG.info("Registered leader {} at path {}.", coordinatorAddress, path); + .forPath(path, ZkData.CoordinatorLeaderZNode.encode(coordinatorAddress)); + LOG.info("Registered Coordinator leader {} at path {}.", coordinatorAddress, path); } /** Get the leader address registered in ZK. */ - public Optional getCoordinatorAddress() throws Exception { - Optional bytes = getOrEmpty(CoordinatorZNode.path()); - return bytes.map(CoordinatorZNode::decode); + public Optional getCoordinatorLeaderAddress() throws Exception { + Optional bytes = getOrEmpty(ZkData.CoordinatorLeaderZNode.path()); + return bytes.map( + data -> + // maybe an empty node when a leader is elected but not registered + data.length == 0 ? null : ZkData.CoordinatorLeaderZNode.decode(data)); + } + + /** Gets the list of coordinator server Ids. */ + public int[] getCoordinatorServerList() throws Exception { + List coordinatorServers = getChildren(ZkData.CoordinatorIdsZNode.path()); + return coordinatorServers.stream().mapToInt(Integer::parseInt).toArray(); + } + + /** Ensure epoch znode exists. */ + public void ensureEpochZnodeExists() throws Exception { + String path = ZkData.CoordinatorEpochZNode.path(); + if (zkClient.checkExists().forPath(path) == null) { + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath( + path, + ZkData.CoordinatorEpochZNode.encode( + CoordinatorContext.INITIAL_COORDINATOR_EPOCH - 1)); + } catch (KeeperException.NodeExistsException e) { + } + } + } + + /** Get epoch now in ZK. */ + public ZkEpoch getCurrentEpoch() throws Exception { + Stat currentStat = new Stat(); + byte[] bytes = + zkClient.getData() + .storingStatIn(currentStat) + .forPath(ZkData.CoordinatorEpochZNode.path()); + int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes); + int currentVersion = currentStat.getVersion(); + return new ZkEpoch(currentEpoch, currentVersion); } // -------------------------------------------------------------------------------------------- @@ -237,13 +332,13 @@ public int[] getSortedTabletServerList() throws Exception { // -------------------------------------------------------------------------------------------- /** Register table assignment to ZK. */ - public void registerTableAssignment(long tableId, TableAssignment tableAssignment) - throws Exception { + public void registerTableAssignment( + long tableId, TableAssignment tableAssignment, int expectedZkVersion) throws Exception { String path = TableIdZNode.path(tableId); - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, TableIdZNode.encode(tableAssignment)); + byte[] data = TableIdZNode.encode(tableAssignment); + + createRecursiveWithEpochCheck(path, data, expectedZkVersion, false); + LOG.info("Registered table assignment {} for table id {}.", tableAssignment, tableId); } @@ -292,22 +387,27 @@ public Map getPartitionsAssignments(Collection "partition assignment"); } - public void updateTableAssignment(long tableId, TableAssignment tableAssignment) - throws Exception { + public void updateTableAssignment( + long tableId, TableAssignment tableAssignment, int expectedZkVersion) throws Exception { String path = TableIdZNode.path(tableId); - zkClient.setData().forPath(path, TableIdZNode.encode(tableAssignment)); + byte[] data = TableIdZNode.encode(tableAssignment); + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.info("Updated table assignment {} for table id {}.", tableAssignment, tableId); } - public void deleteTableAssignment(long tableId) throws Exception { + public void deleteTableAssignment(long tableId, int expectedZkVersion) throws Exception { String path = TableIdZNode.path(tableId); - zkClient.delete().deletingChildrenIfNeeded().forPath(path); + deleteRecursiveWithEpochCheck(path, expectedZkVersion, false); LOG.info("Deleted table assignment for table id {}.", tableId); } - public void deletePartitionAssignment(long partitionId) throws Exception { + public void deletePartitionAssignment(long partitionId, int expectedZkVersion) + throws Exception { String path = PartitionIdZNode.path(partitionId); - zkClient.delete().deletingChildrenIfNeeded().forPath(path); + deleteRecursiveWithEpochCheck(path, expectedZkVersion, false); LOG.info("Deleted table assignment for partition id {}.", partitionId); } @@ -316,18 +416,20 @@ public void deletePartitionAssignment(long partitionId) throws Exception { // -------------------------------------------------------------------------------------------- /** Register bucket LeaderAndIsr to ZK. */ - public void registerLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr) + public void registerLeaderAndIsr( + TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion) throws Exception { + String path = LeaderAndIsrZNode.path(tableBucket); - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr)); + byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); + + createRecursiveWithEpochCheck(path, data, expectedZkVersion, false); LOG.info("Registered {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket); } public void batchRegisterLeaderAndIsrForTablePartition( - List registerList) throws Exception { + List registerList, int expectedZkVersion) + throws Exception { if (registerList.isEmpty()) { return; } @@ -363,12 +465,14 @@ public void batchRegisterLeaderAndIsrForTablePartition( ops.add(parentNodeCreate); ops.add(currentNodeCreate); if (ops.size() == MAX_BATCH_SIZE) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); ops.clear(); } } if (!ops.isEmpty()) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); } LOG.info( "Batch registered leadAndIsr for tableId: {}, partitionId: {}, partitionName: {} in Zookeeper.", @@ -400,14 +504,21 @@ public Map getLeaderAndIsrs(Collection t "leader and isr"); } - public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr) + public void updateLeaderAndIsr( + TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion) throws Exception { String path = LeaderAndIsrZNode.path(tableBucket); - zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr)); + byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); + + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket); } - public void batchUpdateLeaderAndIsr(Map leaderAndIsrList) + public void batchUpdateLeaderAndIsr( + Map leaderAndIsrList, int expectedZkVersion) throws Exception { if (leaderAndIsrList.isEmpty()) { return; @@ -417,25 +528,27 @@ public void batchUpdateLeaderAndIsr(Map leaderAndIsrL for (Map.Entry entry : leaderAndIsrList.entrySet()) { TableBucket tableBucket = entry.getKey(); LeaderAndIsr leaderAndIsr = entry.getValue(); - LOG.info("Batch Update {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket); String path = LeaderAndIsrZNode.path(tableBucket); byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data); ops.add(updateOp); if (ops.size() == MAX_BATCH_SIZE) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); ops.clear(); } } if (!ops.isEmpty()) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); } } - public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception { + public void deleteLeaderAndIsr(TableBucket tableBucket, int expectedZkVersion) + throws Exception { String path = LeaderAndIsrZNode.path(tableBucket); - zkClient.delete().forPath(path); + deleteRecursiveWithEpochCheck(path, expectedZkVersion, false); LOG.info("Deleted LeaderAndIsr for bucket {} in Zookeeper.", tableBucket); } @@ -1573,4 +1686,103 @@ public static Map> processGetChildrenResponses( } return result; } + + /** + * create a node (recursively if parent path not exists) with Zk epoch version check. + * + * @param path the path to create + * @param data the data to write + * @param throwIfPathExists whether to throw exception if path exist + * @throws Exception if any error occurs + */ + public void createRecursiveWithEpochCheck( + String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists) + throws Exception { + CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT); + List ops = wrapRequestWithEpochCheck(createOp, expectedZkVersion); + + try { + // try to directly create + zkClient.transaction().forOperations(ops); + } catch (KeeperException.NodeExistsException e) { + // should not exist + if (throwIfPathExists) { + throw e; + } + } catch (KeeperException.NoNodeException e) { + // if parent does not exist, create parent first + int indexOfLastSlash = path.lastIndexOf("/"); + if (indexOfLastSlash == -1) { + throw new IllegalArgumentException("Invalid path: " + path); + } else if (indexOfLastSlash == 0) { + // root path can be directly create without fence + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); + } catch (KeeperException.NodeExistsException ignored) { + } + } else { + // indexOfLastSlash > 0 + String parentPath = path.substring(0, indexOfLastSlash); + createRecursiveWithEpochCheck( + parentPath, null, expectedZkVersion, throwIfPathExists); + // After creating parent (or if parent is root), retry creating the original path + zkClient.transaction().forOperations(ops); + } + } + } + + /** + * Delete a node (and recursively delete children) with Zk epoch version check. + * + * @param path the path to delete + * @param expectedZkVersion the expected coordinator epoch zk version + * @param throwIfPathNotExists whether to throw exception if path does not exist + * @throws Exception if any error occurs + */ + public void deleteRecursiveWithEpochCheck( + String path, int expectedZkVersion, boolean throwIfPathNotExists) throws Exception { + // delete children recursively + List children = getChildren(path); + for (String child : children) { + deleteRecursiveWithEpochCheck(path + "/" + child, expectedZkVersion, false); + } + + CuratorOp deleteOp = zkOp.deleteOp(path); + List ops = wrapRequestWithEpochCheck(deleteOp, expectedZkVersion); + + try { + // delete itself + zkClient.transaction().forOperations(ops); + } catch (KeeperException.NoNodeException e) { + // should exist + if (throwIfPathNotExists) { + throw e; + } + } + } + + public List wrapRequestWithEpochCheck(CuratorOp request, int expectedZkVersion) + throws Exception { + return wrapRequestsWithEpochCheck(Collections.singletonList(request), expectedZkVersion); + } + + public List wrapRequestsWithEpochCheck( + List requestList, int expectedZkVersion) throws Exception { + if (ZkVersion.MATCH_ANY_VERSION.getVersion() == expectedZkVersion) { + return requestList; + } else if (expectedZkVersion >= 0) { + CuratorOp checkOp = + zkOp.checkOp(ZkData.CoordinatorEpochZNode.path(), expectedZkVersion); + return multiRequest(checkOp, requestList); + } else { + throw new IllegalArgumentException( + "Expected coordinator epoch zkVersion " + + expectedZkVersion + + " should be non-negative or equal to " + + ZkVersion.MATCH_ANY_VERSION.getVersion()); + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java new file mode 100644 index 0000000000..e44670d643 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk; + +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode; + +import java.util.ArrayList; +import java.util.List; + +/** This class contains some utility methods for wrap/unwrap operations for Zookeeper. */ +public class ZooKeeperOp { + private final CuratorFramework zkClient; + + public ZooKeeperOp(CuratorFramework zkClient) { + this.zkClient = zkClient; + } + + public CuratorOp checkOp(String path, int expectZkVersion) throws Exception { + return zkClient.transactionOp().check().withVersion(expectZkVersion).forPath(path); + } + + public CuratorOp createOp(String path, byte[] data, CreateMode createMode) throws Exception { + return zkClient.transactionOp().create().withMode(createMode).forPath(path, data); + } + + public CuratorOp updateOp(String path, byte[] data) throws Exception { + return zkClient.transactionOp().setData().forPath(path, data); + } + + public CuratorOp deleteOp(String path) throws Exception { + return zkClient.transactionOp().delete().forPath(path); + } + + public static List multiRequest(CuratorOp op1, CuratorOp op2) { + List ops = new ArrayList<>(); + ops.add(op1); + ops.add(op2); + return ops; + } + + public static List multiRequest(CuratorOp op, List ops) { + List result = new ArrayList<>(); + result.add(op); + result.addAll(ops); + return result; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java index 5953f33666..41dc6ec2ad 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java @@ -23,20 +23,20 @@ import java.util.Objects; /** - * The address information of an active coordinator stored in {@link ZkData.CoordinatorZNode}. + * The address information of an active coordinator stored in {@link ZkData.CoordinatorLeaderZNode}. * * @see CoordinatorAddressJsonSerde for json serialization and deserialization. */ public class CoordinatorAddress { - private final String id; + private final int id; private final List endpoints; - public CoordinatorAddress(String id, List endpoints) { + public CoordinatorAddress(int id, List endpoints) { this.id = id; this.endpoints = endpoints; } - public String getId() { + public int getId() { return id; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerde.java index 9b577df5a2..c2863fe610 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerde.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerde.java @@ -51,7 +51,7 @@ public void serialize(CoordinatorAddress coordinatorAddress, JsonGenerator gener throws IOException { generator.writeStartObject(); writeVersion(generator); - generator.writeStringField(ID, coordinatorAddress.getId()); + generator.writeNumberField(ID, coordinatorAddress.getId()); generator.writeStringField( LISTENERS, Endpoint.toListenersString(coordinatorAddress.getEndpoints())); generator.writeEndObject(); @@ -60,7 +60,7 @@ public void serialize(CoordinatorAddress coordinatorAddress, JsonGenerator gener @Override public CoordinatorAddress deserialize(JsonNode node) { int version = node.get(VERSION_KEY).asInt(); - String id = node.get(ID).asText(); + int id = node.get(ID).asInt(); List endpoints; if (version == 1) { String host = node.get(HOST).asText(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 4798623a74..e6892680ce 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -279,13 +279,44 @@ public static String path() { // ------------------------------------------------------------------------------------------ /** - * The znode for the active coordinator. The znode path is: + * The znode for alive coordinators. The znode path is: * - *

/coordinators/active + *

/coordinators/ids + */ + public static final class CoordinatorIdsZNode { + public static String path() { + return "/coordinators/ids"; + } + } + + /** + * The znode for a registered Coordinator information. The znode path is: + * + *

/coordinators/ids/[serverId] + */ + public static final class CoordinatorIdZNode { + public static String path(int serverId) { + return CoordinatorIdsZNode.path() + "/" + serverId; + } + } + + /** + * The znode for the coordinator leader election. The znode path is: + * + *

/coordinators/election + */ + public static final class CoordinatorElectionZNode { + public static String path() { + return "/coordinators/election"; + } + } + + /** + * The znode for the active coordinator leader. The znode path is: * - *

Note: introduce standby coordinators in the future for znode "/coordinators/standby/". + *

/coordinators/leader */ - public static final class CoordinatorZNode { + public static final class CoordinatorLeaderZNode { public static String path() { return "/coordinators/active"; } @@ -300,6 +331,24 @@ public static CoordinatorAddress decode(byte[] json) { } } + /** + * The znode for the coordinator epoch. The znode path is: + * + *

/coordinators/epoch + */ + public static final class CoordinatorEpochZNode { + public static String path() { + return "/coordinators/epoch"; + } + + public static byte[] encode(int epoch) { + return String.valueOf(epoch).getBytes(); + } + + public static int decode(byte[] bytes) { + return Integer.parseInt(new String(bytes)); + } + } // ------------------------------------------------------------------------------------------ // ZNodes under "/tabletservers/" // ------------------------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java new file mode 100644 index 0000000000..26242df0f3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +/** Enum to represent the type of special Zookeeper version. */ +public enum ZkVersion { + MATCH_ANY_VERSION(-1), + UNKNOWN_VERSION(-2); + + private final int version; + + ZkVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java index c97cd4e994..61a73f6841 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java @@ -25,7 +25,7 @@ import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; -import org.apache.fluss.server.zk.data.ZkData.CoordinatorZNode; +import org.apache.fluss.server.zk.data.ZkData; import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.data.Stat; import org.apache.fluss.testutils.common.AllCallbackWrapper; @@ -95,7 +95,8 @@ void registerServerNodeWhenZkClientReInitSession() throws Exception { // get the EPHEMERAL node of server String path = server instanceof CoordinatorServer - ? CoordinatorZNode.path() + ? ZkData.CoordinatorIdZNode.path( + server.conf.getInt(ConfigOptions.COORDINATOR_ID)) : ServerIdZNode.path(server.conf.getInt(ConfigOptions.TABLET_SERVER_ID)); long oldNodeCtime = zookeeperClient.getStat(path).get().getCtime(); @@ -143,6 +144,7 @@ protected void verifyEndpoint( } public static CoordinatorServer startCoordinatorServer(Configuration conf) throws Exception { + conf.set(ConfigOptions.COORDINATOR_ID, 0); CoordinatorServer coordinatorServer = new CoordinatorServer(conf); coordinatorServer.start(); return coordinatorServer; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java index bd4467c0a2..be937226b0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java @@ -93,6 +93,7 @@ private void checkSendRequest( // we use update metadata request to test for simplicity UpdateMetadataRequest updateMetadataRequest = makeUpdateMetadataRequest( + null, null, Collections.emptySet(), Collections.emptyList(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index dcfa6b5b3a..f15a47e874 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -166,7 +166,7 @@ static void baseBeforeAll() throws Exception { // register coordinator server zookeeperClient.registerCoordinatorLeader( new CoordinatorAddress( - "2", Endpoint.fromListenersString("CLIENT://localhost:10012"))); + 2, Endpoint.fromListenersString("CLIENT://localhost:10012"))); // register 3 tablet servers for (int i = 0; i < 3; i++) { @@ -398,7 +398,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { BucketState t1Bucket0State = fromCtx(ctx -> ctx.getBucketState(t1Bucket0)); assertThat(t1Bucket0State).isEqualTo(OnlineBucket); // t1 bucket 1 should reelect a leader since the leader is not alive - // the bucket whose leader is in the server should be online a again, but the leadership + // the bucket whose leader is in the server should be online again, but the leadership // should change the leader for bucket2 of t1 should change since the leader fail BucketState t1Bucket1State = fromCtx(ctx -> ctx.getBucketState(t1Bucket1)); assertThat(t1Bucket1State).isEqualTo(OnlineBucket); @@ -905,7 +905,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() { zookeeperClient, serverMetadataCache, testCoordinatorChannelManager, - new CoordinatorContext(), + new TestCoordinatorContext(), autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java new file mode 100644 index 0000000000..cf36b70738 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.CoordinatorAddress; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; + +class CoordinatorServerElectionTest { + @RegisterExtension + public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + protected static ZooKeeperClient zookeeperClient; + + @BeforeAll + static void baseBeforeAll() { + zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + } + + @Test + void testCoordinatorServerElection() throws Exception { + CoordinatorServer coordinatorServer1 = new CoordinatorServer(createConfiguration(1)); + CoordinatorServer coordinatorServer2 = new CoordinatorServer(createConfiguration(2)); + CoordinatorServer coordinatorServer3 = new CoordinatorServer(createConfiguration(3)); + + List coordinatorServerList = + Arrays.asList(coordinatorServer1, coordinatorServer2, coordinatorServer3); + + // start 3 coordinator servers + ExecutorService executor = Executors.newFixedThreadPool(3); + for (int i = 0; i < 3; i++) { + CoordinatorServer server = coordinatorServerList.get(i); + executor.submit( + () -> { + try { + server.start(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + // random coordinator become leader + waitUntilCoordinatorServerElected(); + + CoordinatorAddress firstLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); + + // Find the leader and try to restart it. + CoordinatorServer firstLeader = null; + for (CoordinatorServer coordinatorServer : coordinatorServerList) { + if (coordinatorServer.getServerId() == firstLeaderAddress.getId()) { + firstLeader = coordinatorServer; + break; + } + } + assertThat(firstLeader).isNotNull(); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH); + firstLeader.close(); + firstLeader.start(); + + // Then we should get another Coordinator server leader elected + waitUntilCoordinatorServerReelected(firstLeaderAddress); + CoordinatorAddress secondLeaderAddress = + zookeeperClient.getCoordinatorLeaderAddress().get(); + assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1); + + CoordinatorServer secondLeader = null; + for (CoordinatorServer coordinatorServer : coordinatorServerList) { + if (coordinatorServer.getServerId() == secondLeaderAddress.getId()) { + secondLeader = coordinatorServer; + break; + } + } + CoordinatorServer nonLeader = null; + for (CoordinatorServer coordinatorServer : coordinatorServerList) { + if (coordinatorServer.getServerId() != firstLeaderAddress.getId() + && coordinatorServer.getServerId() != secondLeaderAddress.getId()) { + nonLeader = coordinatorServer; + break; + } + } + // kill other 2 coordinator servers except the first one + nonLeader.close(); + secondLeader.close(); + + // the origin coordinator server should become leader again + waitUntilCoordinatorServerReelected(secondLeaderAddress); + CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); + + assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId()); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 2); + } + + /** Create a configuration with Zookeeper address setting. */ + protected static Configuration createConfiguration(int serverId) { + Configuration configuration = new Configuration(); + configuration.setString( + ConfigOptions.ZOOKEEPER_ADDRESS, + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString()); + configuration.setString( + ConfigOptions.BIND_LISTENERS, "CLIENT://localhost:0,FLUSS://localhost:0"); + configuration.setString(ConfigOptions.ADVERTISED_LISTENERS, "CLIENT://198.168.0.1:100"); + configuration.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); + + // set to small timout to verify the case that zk session is timeout + configuration.set(ConfigOptions.ZOOKEEPER_SESSION_TIMEOUT, Duration.ofMillis(500)); + configuration.set(ConfigOptions.ZOOKEEPER_CONNECTION_TIMEOUT, Duration.ofMillis(500)); + configuration.set(ConfigOptions.ZOOKEEPER_RETRY_WAIT, Duration.ofMillis(500)); + + configuration.set(ConfigOptions.COORDINATOR_ID, serverId); + return configuration; + } + + public void waitUntilCoordinatorServerElected() { + waitUntil( + () -> { + return zookeeperClient.getCoordinatorLeaderAddress().isPresent(); + }, + Duration.ofMinutes(1), + "Fail to wait coordinator server elected"); + } + + public void waitUntilCoordinatorServerReelected(CoordinatorAddress address) { + waitUntil( + () -> { + return zookeeperClient.getCoordinatorLeaderAddress().isPresent() + && !zookeeperClient.getCoordinatorLeaderAddress().get().equals(address); + }, + Duration.ofMinutes(1), + "Fail to wait coordinator server reelected"); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java index bd060d6eed..21c0af8f76 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java @@ -63,6 +63,8 @@ protected Configuration getServerConfig() { ConfigOptions.BIND_LISTENERS, String.format("%s://%s:%d", DEFAULT_LISTENER_NAME, HOSTNAME, getPort())); conf.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); + conf.set(ConfigOptions.COORDINATOR_ID, 0); + return conf; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java index aee7f0d479..1a8326ae77 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java @@ -26,8 +26,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.time.Duration; import java.util.Optional; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link CoordinatorServer} . */ @@ -38,6 +40,7 @@ class CoordinatorServerTest extends ServerTestBase { @BeforeEach void beforeEach() throws Exception { coordinatorServer = startCoordinatorServer(createConfiguration()); + waitUntilCoordinatorServerElected(); } @AfterEach @@ -55,6 +58,7 @@ protected ServerBase getServer() { @Override protected ServerBase getStartFailServer() { Configuration configuration = createConfiguration(); + configuration.set(ConfigOptions.COORDINATOR_ID, 0); configuration.set(ConfigOptions.BIND_LISTENERS, "CLIENT://localhost:-12"); return new CoordinatorServer(configuration); } @@ -63,10 +67,25 @@ protected ServerBase getStartFailServer() { protected void checkAfterStartServer() throws Exception { assertThat(coordinatorServer.getRpcServer()).isNotNull(); // check the data put in zk after coordinator server start - Optional optCoordinatorAddr = zookeeperClient.getCoordinatorAddress(); + Optional optCoordinatorAddr = + zookeeperClient.getCoordinatorLeaderAddress(); assertThat(optCoordinatorAddr).isNotEmpty(); verifyEndpoint( optCoordinatorAddr.get().getEndpoints(), coordinatorServer.getRpcServer().getBindEndpoints()); } + + public void waitUntilCoordinatorServerElected() { + waitUntil( + () -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(), + Duration.ofSeconds(10), + "Fail to wait coordinator server elected"); + // Sleep 1 second to make sure coordinator server has been started and event processor + // started. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index 8f12e43c47..dc283c0193 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -500,6 +500,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception { .updateMetadata( makeUpdateMetadataRequest( coordinatorServerInfo, + null, new HashSet<>(tabletServerInfos), Collections.emptyList(), Collections.emptyList())) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java index 3c417efe8d..c2c4f2aecd 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java @@ -36,6 +36,7 @@ import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.AfterAll; @@ -112,7 +113,7 @@ static void afterAll() { private void initTableManager() { testingEventManager = new TestingEventManager(); - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new TestCoordinatorContext(); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); @@ -169,7 +170,7 @@ void testCreateTable() throws Exception { // all replica should be online checkReplicaOnline(tableId, null, assignment); // clear the assignment for the table - zookeeperClient.deleteTableAssignment(tableId); + zookeeperClient.deleteTableAssignment(tableId, ZkVersion.MATCH_ANY_VERSION.getVersion()); } @Test @@ -177,7 +178,8 @@ void testDeleteTable() throws Exception { // first, create a table long tableId = zookeeperClient.getTableIdAndIncrement(); TableAssignment assignment = createAssignment(); - zookeeperClient.registerTableAssignment(tableId, assignment); + zookeeperClient.registerTableAssignment( + tableId, assignment, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.putTableInfo( TableInfo.of( @@ -216,7 +218,8 @@ void testResumeDeletionAfterRestart() throws Exception { // first, create a table long tableId = zookeeperClient.getTableIdAndIncrement(); TableAssignment assignment = createAssignment(); - zookeeperClient.registerTableAssignment(tableId, assignment); + zookeeperClient.registerTableAssignment( + tableId, assignment, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.putTableInfo( TableInfo.of( @@ -263,7 +266,8 @@ void testCreateAndDropPartition() throws Exception { // create a table long tableId = zookeeperClient.getTableIdAndIncrement(); TableAssignment assignment = TableAssignment.builder().build(); - zookeeperClient.registerTableAssignment(tableId, assignment); + zookeeperClient.registerTableAssignment( + tableId, assignment, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.putTableInfo( TableInfo.of( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java new file mode 100644 index 0000000000..b931d564ff --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.server.zk.data.ZkVersion; + +/** A coordinator context for test purpose which can set epoch manually. */ +public class TestCoordinatorContext extends CoordinatorContext { + public TestCoordinatorContext() { + // When create or modify ZooKeeper node, it should check ZooKeeper epoch node version + // to ensure the coordinator is still holding the leadership. However, in the test + // cases, we don't register epoch node, so we skip the check process by setting + // "coordinatorEpochZkVersion" to ZkVersion.MATCH_ANY_VERSION + super(); + this.setCoordinatorEpochAndZkVersion( + INITIAL_COORDINATOR_EPOCH, ZkVersion.MATCH_ANY_VERSION.getVersion()); + } + + public TestCoordinatorContext(int coordinatorEpoch, int coordinatorEpochZkVersion) { + super(); + this.setCoordinatorEpochAndZkVersion(coordinatorEpoch, coordinatorEpochZkVersion); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcherTest.java new file mode 100644 index 0000000000..f35206c3d3 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorServerChangeWatcherTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.server.coordinator.event.CoordinatorEvent; +import org.apache.fluss.server.coordinator.event.DeadCoordinatorServerEvent; +import org.apache.fluss.server.coordinator.event.NewCoordinatorServerEvent; +import org.apache.fluss.server.coordinator.event.TestingEventManager; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CoordinatorServerChangeWatcher} . */ +class CoordinatorServerChangeWatcherTest { + + @RegisterExtension + public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + @Test + void testServerChanges() throws Exception { + ZooKeeperClient zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + TestingEventManager eventManager = new TestingEventManager(); + CoordinatorServerChangeWatcher coordinatorServerChangeWatcher = + new CoordinatorServerChangeWatcher(zookeeperClient, eventManager); + coordinatorServerChangeWatcher.start(); + + // register new servers + List expectedEvents = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + expectedEvents.add(new NewCoordinatorServerEvent(i)); + zookeeperClient.registerCoordinatorServer(i); + } + + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventManager.getEvents()) + .containsExactlyInAnyOrderElementsOf(expectedEvents)); + + // close it to mock the servers become down + zookeeperClient.close(); + + // unregister servers + for (int i = 0; i < 10; i++) { + expectedEvents.add(new DeadCoordinatorServerEvent(i)); + } + + retry( + Duration.ofMinutes(1), + () -> + assertThat(eventManager.getEvents()) + .containsExactlyInAnyOrderElementsOf(expectedEvents)); + + coordinatorServerChangeWatcher.stop(); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java index d7739028d7..6da8dfacd8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java @@ -49,7 +49,7 @@ class TabletServerChangeWatcherTest { new AllCallbackWrapper<>(new ZooKeeperExtension()); @Test - void testServetChanges() throws Exception { + void testServerChanges() throws Exception { ZooKeeperClient zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java index 454ec5de4b..51f2ba4948 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java @@ -30,6 +30,7 @@ import org.apache.fluss.server.coordinator.CoordinatorRequestBatch; import org.apache.fluss.server.coordinator.CoordinatorTestUtils; import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; +import org.apache.fluss.server.coordinator.TestCoordinatorContext; import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent; import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.metadata.ServerInfo; @@ -37,6 +38,7 @@ import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.BeforeAll; @@ -79,7 +81,7 @@ static void baseBeforeAll() { @Test void testStartup() { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); // init coordinator server context with a table assignment TableBucket tableBucket = new TableBucket(1, 0); @@ -104,7 +106,7 @@ void testStartup() { @Test void testReplicaStateChange() { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); // test check valid replica state change @@ -133,7 +135,7 @@ void testReplicaStateChange() { @Test void testDeleteReplicaStateChange() { Map isReplicaDeleteSuccess = new HashMap<>(); - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); coordinatorContext.setLiveTabletServers( CoordinatorTestUtils.createServers(Arrays.asList(0, 1))); // use a context that will return a gateway that always get success ack @@ -190,7 +192,7 @@ void testDeleteReplicaStateChange() { @Test void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); @@ -214,7 +216,8 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { } // put leader and isr LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr); @@ -228,7 +231,7 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { @Test void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); @@ -250,7 +253,8 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { } // put leader and isr LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index c57c9950e5..667560351e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -34,6 +34,7 @@ import org.apache.fluss.server.coordinator.LakeTableTieringManager; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; +import org.apache.fluss.server.coordinator.TestCoordinatorContext; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metrics.group.TestingMetricGroups; @@ -41,6 +42,7 @@ import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.shaded.guava32.com.google.common.collect.Sets; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; @@ -98,7 +100,7 @@ void beforeEach() throws IOException { Configuration conf = new Configuration(); conf.setString(ConfigOptions.COORDINATOR_HOST, "localhost"); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new TestCoordinatorContext(); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); coordinatorRequestBatch = new CoordinatorRequestBatch( @@ -140,9 +142,13 @@ void testStartup() throws Exception { // create LeaderAndIsr for t10/t11 info in zk, zookeeperClient.registerLeaderAndIsr( - new TableBucket(t1Id, 0), new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0)); + new TableBucket(t1Id, 0), + new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - new TableBucket(t1Id, 1), new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0)); + new TableBucket(t1Id, 1), + new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0), + ZkVersion.MATCH_ANY_VERSION.getVersion()); // update the LeaderAndIsr to context coordinatorContext.putBucketLeaderAndIsr( t1b0, zookeeperClient.getLeaderAndIsr(new TableBucket(t1Id, 0)).get()); @@ -205,6 +211,8 @@ void testStateChangeToOnline() throws Exception { coordinatorContext.putTablePath(tableId, fakeTablePath); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketState(tableBucket, NewBucket); + coordinatorContext.setCoordinatorEpochAndZkVersion( + 0, ZkVersion.MATCH_ANY_VERSION.getVersion()); // case1: init a new leader for NewBucket to OnlineBucket tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket), OnlineBucket); // non any alive servers, the state change fail diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java index 1ebb0c3e03..3c7bf8e240 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java @@ -36,6 +36,7 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.AfterAll; @@ -109,8 +110,10 @@ void testGetTableMetadataFromZk() throws Exception { LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000); LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3, 4), 200, 2000); - zookeeperClient.registerLeaderAndIsr(tableBucket0, leaderAndIsr0); - zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1); + zookeeperClient.registerLeaderAndIsr( + tableBucket0, leaderAndIsr0, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerLeaderAndIsr( + tableBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); List tablesMetadataFromZK = metadataProvider.getTablesMetadataFromZK( @@ -170,8 +173,10 @@ void testGetPartitionMetadataFromZk() throws Exception { LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000); LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000); - zookeeperClient.registerLeaderAndIsr(partitionBucket0, leaderAndIsr0); - zookeeperClient.registerLeaderAndIsr(partitionBucket1, leaderAndIsr1); + zookeeperClient.registerLeaderAndIsr( + partitionBucket0, leaderAndIsr0, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerLeaderAndIsr( + partitionBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); // Test getPartitionMetadataFromZkAsync PhysicalTablePath partitionPath = PhysicalTablePath.of(tablePath, partitionName); @@ -254,11 +259,17 @@ void testBatchGetPartitionMetadataFromZkAsync() throws Exception { TableBucket bucket3 = new TableBucket(tableId2, partitionId3, 0); zookeeperClient.registerLeaderAndIsr( - bucket1, new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000)); + bucket1, + new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - bucket2, new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000)); + bucket2, + new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - bucket3, new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000)); + bucket3, + new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); // Test getPartitionsMetadataFromZK List partitionPaths = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java index b2e93f2bbc..4bbff888e7 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java @@ -32,7 +32,7 @@ public class TestingMetricGroups { new TabletServerMetricGroup(NOPMetricRegistry.INSTANCE, "fluss", "host", "rack", 0); public static final CoordinatorMetricGroup COORDINATOR_METRICS = - new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", "host", "0"); + new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", "host", 0); public static final TableMetricGroup TABLE_METRICS = new TableMetricGroup( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 01cf2701a4..1ee1b0c3f4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -887,6 +887,7 @@ void testBecomeLeaderOrFollowerWithOneTabletServerOffline() throws Exception { .updateMetadata( makeUpdateMetadataRequest( coordinatorServerInfo, + null, newTabletServerInfos, Collections.emptyList(), Collections.emptyList())) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index bab31f10b2..b028467ba1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -254,11 +254,13 @@ public void startCoordinatorServer() throws Exception { if (coordinatorServer == null) { // if no coordinator server exists, create a new coordinator server and start Configuration conf = new Configuration(clusterConf); + conf.set(ConfigOptions.COORDINATOR_ID, 0); conf.setString(ConfigOptions.ZOOKEEPER_ADDRESS, zooKeeperServer.getConnectString()); conf.setString(ConfigOptions.BIND_LISTENERS, coordinatorServerListeners); setRemoteDataDir(conf); coordinatorServer = new CoordinatorServer(conf); coordinatorServer.start(); + waitUntilCoordinatorServerElected(); coordinatorServerInfo = // TODO, Currently, we use 0 as coordinator server id. new ServerInfo( @@ -269,6 +271,7 @@ public void startCoordinatorServer() throws Exception { } else { // start the existing coordinator server coordinatorServer.start(); + waitUntilCoordinatorServerElected(); coordinatorServerInfo = new ServerInfo( 0, @@ -832,6 +835,20 @@ public CoordinatorServer getCoordinatorServer() { return coordinatorServer; } + public void waitUntilCoordinatorServerElected() { + waitUntil( + () -> zooKeeperClient.getCoordinatorLeaderAddress().isPresent(), + Duration.ofSeconds(10), + "Fail to wait coordinator server elected"); + // Sleep 1 second to make sure coordinator server has been started and event processor + // started. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + // -------------------------------------------------------------------------------------------- /** Builder for {@link FlussClusterExtension}. */ diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index 64da4b95cb..b772b54f3b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -40,6 +40,7 @@ import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.TabletServerRegistration; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.shaded.curator5.org.apache.curator.CuratorZookeeperClient; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; @@ -104,14 +105,14 @@ static void afterAll() { void testCoordinatorLeader() throws Exception { // try to get leader address, should return empty since node leader address stored in // zk - assertThat(zookeeperClient.getCoordinatorAddress()).isEmpty(); + assertThat(zookeeperClient.getCoordinatorLeaderAddress()).isEmpty(); CoordinatorAddress coordinatorAddress = new CoordinatorAddress( - "2", Endpoint.fromListenersString("CLIENT://localhost1:10012")); + 2, Endpoint.fromListenersString("CLIENT://localhost1:10012")); // register leader address zookeeperClient.registerCoordinatorLeader(coordinatorAddress); // check get leader address - CoordinatorAddress gottenAddress = zookeeperClient.getCoordinatorAddress().get(); + CoordinatorAddress gottenAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); assertThat(gottenAddress).isEqualTo(coordinatorAddress); } @@ -161,8 +162,10 @@ void testTabletAssignments() throws Exception { .add(0, BucketAssignment.of(1, 2)) .add(1, BucketAssignment.of(3, 4, 5)) .build(); - zookeeperClient.registerTableAssignment(tableId1, tableAssignment1); - zookeeperClient.registerTableAssignment(tableId2, tableAssignment2); + zookeeperClient.registerTableAssignment( + tableId1, tableAssignment1, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerTableAssignment( + tableId2, tableAssignment2, ZkVersion.MATCH_ANY_VERSION.getVersion()); assertThat(zookeeperClient.getTableAssignment(tableId1)).contains(tableAssignment1); assertThat(zookeeperClient.getTableAssignment(tableId2)).contains(tableAssignment2); assertThat(zookeeperClient.getTablesAssignments(Arrays.asList(tableId1, tableId2))) @@ -171,11 +174,12 @@ void testTabletAssignments() throws Exception { // test update TableAssignment tableAssignment3 = TableAssignment.builder().add(3, BucketAssignment.of(1, 5)).build(); - zookeeperClient.updateTableAssignment(tableId1, tableAssignment3); + zookeeperClient.updateTableAssignment( + tableId1, tableAssignment3, ZkVersion.MATCH_ANY_VERSION.getVersion()); assertThat(zookeeperClient.getTableAssignment(tableId1)).contains(tableAssignment3); // test delete - zookeeperClient.deleteTableAssignment(tableId1); + zookeeperClient.deleteTableAssignment(tableId1, ZkVersion.MATCH_ANY_VERSION.getVersion()); assertThat(zookeeperClient.getTableAssignment(tableId1)).isEmpty(); } @@ -188,23 +192,26 @@ void testLeaderAndIsr() throws Exception { assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).isEmpty(); // try to register bucket leaderAndIsr - LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000); - LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 100, 1000); + LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 0, 1000); + LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 0, 1000); - zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1); - zookeeperClient.registerLeaderAndIsr(tableBucket2, leaderAndIsr2); + zookeeperClient.registerLeaderAndIsr( + tableBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerLeaderAndIsr( + tableBucket2, leaderAndIsr2, ZkVersion.MATCH_ANY_VERSION.getVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).hasValue(leaderAndIsr2); assertThat(zookeeperClient.getLeaderAndIsrs(Arrays.asList(tableBucket1, tableBucket2))) .containsValues(leaderAndIsr1, leaderAndIsr2); // test update - leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 200, 2000); - zookeeperClient.updateLeaderAndIsr(tableBucket1, leaderAndIsr1); + leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 0, 2000); + zookeeperClient.updateLeaderAndIsr( + tableBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1); // test delete - zookeeperClient.deleteLeaderAndIsr(tableBucket1); + zookeeperClient.deleteLeaderAndIsr(tableBucket1, ZkVersion.MATCH_ANY_VERSION.getVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).isEmpty(); } @@ -217,7 +224,7 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep TableBucket tableBucket = isPartitionTable ? new TableBucket(1, 2L, i) : new TableBucket(1, i); LeaderAndIsr leaderAndIsr = - new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000); + new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000); leaderAndIsrList.add(leaderAndIsr); RegisterTableBucketLeadAndIsrInfo info = isPartitionTable @@ -228,7 +235,8 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep tableBucketInfo.add(info); } // batch create - zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(tableBucketInfo); + zookeeperClient.batchRegisterLeaderAndIsrForTablePartition( + tableBucketInfo, ZkVersion.MATCH_ANY_VERSION.getVersion()); for (int i = 0; i < 100; i++) { // each should register successful @@ -258,7 +266,8 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep entry.setValue(adjustLeaderAndIsr); }); // batch update - zookeeperClient.batchUpdateLeaderAndIsr(updateMap); + zookeeperClient.batchUpdateLeaderAndIsr( + updateMap, ZkVersion.MATCH_ANY_VERSION.getVersion()); for (int i = 0; i < 100; i++) { // each should update successful Optional optionalLeaderAndIsr = @@ -277,9 +286,10 @@ void testBatchUpdateLeaderAndIsr() throws Exception { for (int i = 0; i < totalCount; i++) { TableBucket tableBucket = new TableBucket(1, i); LeaderAndIsr leaderAndIsr = - new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000); + new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000); leaderAndIsrList.put(tableBucket, leaderAndIsr); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion()); } // try to batch update @@ -294,15 +304,17 @@ void testBatchUpdateLeaderAndIsr() throws Exception { old.leader() + 1, old.leaderEpoch() + 1, old.isr(), - old.coordinatorEpoch() + 1, + old.coordinatorEpoch(), old.bucketEpoch() + 1); })); - zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList); + zookeeperClient.batchUpdateLeaderAndIsr( + updateLeaderAndIsrList, ZkVersion.MATCH_ANY_VERSION.getVersion()); for (Map.Entry entry : updateLeaderAndIsrList.entrySet()) { TableBucket tableBucket = entry.getKey(); LeaderAndIsr leaderAndIsr = entry.getValue(); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr); - zookeeperClient.deleteLeaderAndIsr(tableBucket); + zookeeperClient.deleteLeaderAndIsr( + tableBucket, ZkVersion.MATCH_ANY_VERSION.getVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).isEmpty(); } } @@ -449,7 +461,8 @@ void testTableBucketSnapshot() throws Exception { table1Bucket2.getTableId(), TableAssignment.builder() .add(table1Bucket2.getBucket(), BucketAssignment.of(0, 1, 2)) - .build()); + .build(), + ZkVersion.MATCH_ANY_VERSION.getVersion()); BucketSnapshot snapshot1 = new BucketSnapshot(1L, 10L, "oss://test/cp1"); BucketSnapshot snapshot2 = new BucketSnapshot(2L, 20L, "oss://test/cp2"); zookeeperClient.registerTableBucketSnapshot(table1Bucket2, snapshot1); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerdeTest.java index d90151d838..de1cd6555a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerdeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerdeTest.java @@ -39,7 +39,7 @@ public class CoordinatorAddressJsonSerdeTest extends JsonSerdeTestBase - coordinator - - + coordinator + - activeCoordinatorCount The number of active CoordinatorServer in this cluster. Gauge + + aliveCoordinatorServerCount + The number of alive CoordinatorServer in this cluster. + Gauge + activeTabletServerCount The number of active TabletServer in this cluster.