From a9a8020918c345fb6c3c0a099a27189cbd7cf819 Mon Sep 17 00:00:00 2001 From: Hexiaoqiao Date: Tue, 4 Jun 2024 10:23:19 +0800 Subject: [PATCH] CURATOR-688. Fix SharedCount/SharedValue not update after Stat.version overflowed (#478) Co-authored-by: Kezhu Wang Co-authored-by: tison Signed-off-by: tison --- .../shared/IllegalTrySetVersionException.java | 39 +++++++++++++++++++ .../framework/recipes/shared/SharedCount.java | 7 ++-- .../framework/recipes/shared/SharedValue.java | 37 ++++++++++-------- .../recipes/shared/VersionedValue.java | 31 ++++++++++++--- .../recipes/shared/TestSharedCount.java | 13 ++++++- 5 files changed, 100 insertions(+), 27 deletions(-) create mode 100644 curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java new file mode 100644 index 000000000..58e833992 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.recipes.shared; + +import org.apache.zookeeper.data.Stat; + +/** + * Exception to alert overflowed {@link Stat#getVersion()} {@code -1} which is not suitable in + * {@link SharedValue#trySetValue(VersionedValue, byte[])} and {@link SharedCount#trySetCount(VersionedValue, int)}. + * + *

In case of this exception, clients have to choose: + *

    + *
  • Take their own risk to do a blind set.
  • + *
  • Update ZooKeeper cluster to solve ZOOKEEPER-4743.
  • + *
+ */ +public class IllegalTrySetVersionException extends IllegalArgumentException { + @Override + public String getMessage() { + return "overflowed Stat.version -1 is not suitable for trySet(a.k.a. compare-and-set ZooKeeper::setData)"; + } +} diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java index c401a3b05..2ecc528e4 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java @@ -30,6 +30,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionState; +import org.apache.zookeeper.data.Stat; /** * Manages a shared integer. All clients watching the same path will have the up-to-date @@ -60,7 +61,7 @@ public int getCount() { @Override public VersionedValue getVersionedValue() { VersionedValue localValue = sharedValue.getVersionedValue(); - return new VersionedValue(localValue.getVersion(), fromBytes(localValue.getValue())); + return localValue.mapValue(SharedCount::fromBytes); } /** @@ -102,11 +103,11 @@ public boolean trySetCount(int newCount) throws Exception { * @param newCount the new value to attempt * @return true if the change attempt was successful, false if not. If the change * was not successful, {@link #getCount()} will return the updated value + * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1} * @throws Exception ZK errors, interruptions, etc. */ public boolean trySetCount(VersionedValue previous, int newCount) throws Exception { - VersionedValue previousCopy = - new VersionedValue(previous.getVersion(), toBytes(previous.getValue())); + VersionedValue previousCopy = previous.mapValue(SharedCount::toBytes); return sharedValue.trySetValue(previousCopy, toBytes(newCount)); } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index 50e50edd0..ddb96fcc2 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -48,6 +48,7 @@ * value (considering ZK's normal consistency guarantees). */ public class SharedValue implements Closeable, SharedValueReader { + private static final int NO_ZXID = -1; private static final int UNINITIALIZED_VERSION = -1; private final Logger log = LoggerFactory.getLogger(getClass()); @@ -101,8 +102,8 @@ public SharedValue(CuratorFramework client, String path, byte[] seedValue) { this.path = PathUtils.validatePath(path); this.seedValue = Arrays.copyOf(seedValue, seedValue.length); this.watcher = new SharedValueCuratorWatcher(); - currentValue = new AtomicReference>( - new VersionedValue(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); + currentValue = new AtomicReference<>( + new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); } @VisibleForTesting @@ -112,8 +113,8 @@ protected SharedValue(WatcherRemoveCuratorFramework client, String path, byte[] this.seedValue = Arrays.copyOf(seedValue, seedValue.length); // inject watcher for testing this.watcher = watcher; - currentValue = new AtomicReference>( - new VersionedValue(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); + currentValue = new AtomicReference<>( + new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length))); } @Override @@ -125,12 +126,11 @@ public byte[] getValue() { @Override public VersionedValue getVersionedValue() { VersionedValue localCopy = currentValue.get(); - return new VersionedValue( - localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(), localCopy.getValue().length)); + return localCopy.mapValue(bytes -> Arrays.copyOf(bytes, bytes.length)); } /** - * Change the shared value value irrespective of its previous state + * Change the shared value irrespective of its previous state * * @param newValue new value * @throws Exception ZK errors, interruptions, etc. @@ -139,7 +139,7 @@ public void setValue(byte[] newValue) throws Exception { Preconditions.checkState(state.get() == State.STARTED, "not started"); Stat result = client.setData().forPath(path, newValue); - updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length)); + updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length)); } /** @@ -171,19 +171,25 @@ public boolean trySetValue(byte[] newValue) throws Exception { * @param newValue the new value to attempt * @return true if the change attempt was successful, false if not. If the change * was not successful, {@link #getValue()} will return the updated value + * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1} * @throws Exception ZK errors, interruptions, etc. */ public boolean trySetValue(VersionedValue previous, byte[] newValue) throws Exception { Preconditions.checkState(state.get() == State.STARTED, "not started"); VersionedValue current = currentValue.get(); - if (previous.getVersion() != current.getVersion() || !Arrays.equals(previous.getValue(), current.getValue())) { + // Omit comparing of getVersion here, so we can test the exception case. + // This affects no correctness as construction of VersionedValue is private. + if (previous.getZxid() != current.getZxid() || !Arrays.equals(previous.getValue(), current.getValue())) { return false; } + if (previous.getVersion() == -1) { + throw new IllegalTrySetVersionException(); + } try { Stat result = client.setData().withVersion(previous.getVersion()).forPath(path, newValue); - updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length)); + updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length)); return true; } catch (KeeperException.BadVersionException ignore) { // ignore @@ -193,14 +199,13 @@ public boolean trySetValue(VersionedValue previous, byte[] newValue) thr return false; } - private void updateValue(int version, byte[] bytes) { + private void updateValue(long zxid, int version, byte[] bytes) { while (true) { VersionedValue current = currentValue.get(); - if (current.getVersion() >= version) { - // A newer version was concurrently set. + if (current.getZxid() >= zxid) { return; } - if (currentValue.compareAndSet(current, new VersionedValue(version, bytes))) { + if (currentValue.compareAndSet(current, new VersionedValue<>(zxid, version, bytes))) { // Successfully set. return; } @@ -248,14 +253,14 @@ private void readValue() throws Exception { Stat localStat = new Stat(); byte[] bytes = client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path); - updateValue(localStat.getVersion(), bytes); + updateValue(localStat.getMzxid(), localStat.getVersion(), bytes); } private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getResultCode() == KeeperException.Code.OK.intValue()) { - updateValue(event.getStat().getVersion(), event.getData()); + updateValue(event.getStat().getMzxid(), event.getStat().getVersion(), event.getData()); notifyListeners(); } } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java index 64d9780b7..64f097657 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java @@ -20,23 +20,38 @@ package org.apache.curator.framework.recipes.shared; import com.google.common.base.Preconditions; +import java.util.function.Function; +import org.apache.zookeeper.data.Stat; /** - * POJO for a version and a value + * POJO for versioned value. + * + *

Client must never construct this but get through {@link SharedValue#getVersionedValue()} + * or {@link SharedCount#getVersionedValue()}. */ public class VersionedValue { + private final long zxid; private final int version; private final T value; - /** - * @param version the version - * @param value the value (cannot be null) - */ - VersionedValue(int version, T value) { + VersionedValue(long zxid, int version, T value) { + this.zxid = zxid; this.version = version; this.value = Preconditions.checkNotNull(value, "value cannot be null"); } + /** + * It is {@link Stat#getMzxid()} of the corresponding node. + */ + public long getZxid() { + return zxid; + } + + /** + * It is {@link Stat#getVersion()} of the corresponding node. + * + *

It is known that this will overflow and hence not monotonic. + */ public int getVersion() { return version; } @@ -44,4 +59,8 @@ public int getVersion() { public T getValue() { return value; } + + VersionedValue mapValue(Function f) { + return new VersionedValue<>(zxid, version, f.apply(value)); + } } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index b7b56c06a..6e843883d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -51,6 +52,7 @@ import org.apache.zookeeper.WatchedEvent; import org.junit.jupiter.api.Test; +@SuppressWarnings("deprecation") public class TestSharedCount extends CuratorTestBase { @Test public void testMultiClients() throws Exception { @@ -206,13 +208,20 @@ public void testSimpleVersioned() throws Exception { assertEquals(count.getCount(), 10); // Wrong value - assertFalse(count.trySetCount(new VersionedValue(3, 20), 7)); + assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 3, 20), 7)); // Wrong version - assertFalse(count.trySetCount(new VersionedValue(10, 10), 7)); + assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 10, 10), 7)); + assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid() + 1, 3, 10), 7)); // Server changed client.setData().forPath("/count", SharedCount.toBytes(88)); assertFalse(count.trySetCount(current, 234)); + + assertThrows(IllegalTrySetVersionException.class, () -> { + VersionedValue cached = count.getVersionedValue(); + VersionedValue illegal = new VersionedValue<>(cached.getZxid(), -1, cached.getValue()); + count.trySetCount(illegal, 20); + }); } finally { CloseableUtils.closeQuietly(count); CloseableUtils.closeQuietly(client);