From 84c3b1969d732934bd1ae94bb8863edc1d3fc906 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 22 Oct 2024 12:09:32 +0800 Subject: [PATCH] [fix][standalone] Correctly delete bookie registration znode Signed-off-by: Zixuan Liu --- .../zookeeper/LocalBookkeeperEnsemble.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index de3077959a444..939998f6f8054 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -297,22 +297,16 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { } int bookiePort = portManager.get(); - + String bookieId = "bk" + i + "test"; // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully - String registrationZnode = String.format("/ledgers/available/%s:%d", - baseConf.getAdvertisedAddress(), bookiePort); - if (zkc.exists(registrationZnode, null) != null) { - try { - zkc.delete(registrationZnode, -1); - } catch (NoNodeException nne) { - // Ignore if z-node was just expired - } - } + deleteBookieRegistrationZnode( + String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort)); + deleteBookieRegistrationZnode(String.format("/ledgers/available/%s", bookieId)); bsConfs[i] = new ServerConfiguration(baseConf); // override settings bsConfs[i].setBookiePort(bookiePort); - bsConfs[i].setBookieId("bk" + i + "test"); + bsConfs[i].setBookieId(bookieId); String zkServers = "127.0.0.1:" + zkPort; String metadataServiceUriStr = "zk://" + zkServers + "/ledgers"; @@ -327,6 +321,16 @@ private void runBookies(ServerConfiguration baseConf) throws Exception { } } + private void deleteBookieRegistrationZnode(String registrationZnode) throws InterruptedException, KeeperException { + if (zkc.exists(registrationZnode, null) != null) { + try { + zkc.delete(registrationZnode, -1); + } catch (NoNodeException nne) { + // Ignore if z-node was just expired + } + } + } + public void runStreamStorage(CompositeConfiguration conf) throws Exception { String zkServers = "127.0.0.1:" + zkPort; String metadataServiceUriStr = "zk://" + zkServers + "/ledgers";