Skip to content

Commit

Permalink
CURATOR-688. Fix SharedCount/SharedValue not update after Stat.versio…
Browse files Browse the repository at this point in the history
…n overflowed (#478)

Co-authored-by: Kezhu Wang <kezhuw@apache.org>
Co-authored-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
3 people authored Jun 4, 2024
1 parent 5af5404 commit a9a8020
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.framework.recipes.shared;

import org.apache.zookeeper.data.Stat;

/**
* Exception to alert overflowed {@link Stat#getVersion()} {@code -1} which is not suitable in
* {@link SharedValue#trySetValue(VersionedValue, byte[])} and {@link SharedCount#trySetCount(VersionedValue, int)}.
*
* <p>In case of this exception, clients have to choose:
* <ul>
* <li>Take their own risk to do a blind set.</li>
* <li>Update ZooKeeper cluster to solve <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-4743">ZOOKEEPER-4743</a>.</li>
* </ul>
*/
public class IllegalTrySetVersionException extends IllegalArgumentException {
@Override
public String getMessage() {
return "overflowed Stat.version -1 is not suitable for trySet(a.k.a. compare-and-set ZooKeeper::setData)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.zookeeper.data.Stat;

/**
* Manages a shared integer. All clients watching the same path will have the up-to-date
Expand Down Expand Up @@ -60,7 +61,7 @@ public int getCount() {
@Override
public VersionedValue<Integer> getVersionedValue() {
VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
return localValue.mapValue(SharedCount::fromBytes);
}

/**
Expand Down Expand Up @@ -102,11 +103,11 @@ public boolean trySetCount(int newCount) throws Exception {
* @param newCount the new value to attempt
* @return true if the change attempt was successful, false if not. If the change
* was not successful, {@link #getCount()} will return the updated value
* @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1}
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception {
VersionedValue<byte[]> previousCopy =
new VersionedValue<byte[]>(previous.getVersion(), toBytes(previous.getValue()));
VersionedValue<byte[]> previousCopy = previous.mapValue(SharedCount::toBytes);
return sharedValue.trySetValue(previousCopy, toBytes(newCount));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* value (considering ZK's normal consistency guarantees).
*/
public class SharedValue implements Closeable, SharedValueReader {
private static final int NO_ZXID = -1;
private static final int UNINITIALIZED_VERSION = -1;

private final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -101,8 +102,8 @@ public SharedValue(CuratorFramework client, String path, byte[] seedValue) {
this.path = PathUtils.validatePath(path);
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
this.watcher = new SharedValueCuratorWatcher();
currentValue = new AtomicReference<VersionedValue<byte[]>>(
new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
currentValue = new AtomicReference<>(
new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
}

@VisibleForTesting
Expand All @@ -112,8 +113,8 @@ protected SharedValue(WatcherRemoveCuratorFramework client, String path, byte[]
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
// inject watcher for testing
this.watcher = watcher;
currentValue = new AtomicReference<VersionedValue<byte[]>>(
new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
currentValue = new AtomicReference<>(
new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
}

@Override
Expand All @@ -125,12 +126,11 @@ public byte[] getValue() {
@Override
public VersionedValue<byte[]> getVersionedValue() {
VersionedValue<byte[]> localCopy = currentValue.get();
return new VersionedValue<byte[]>(
localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(), localCopy.getValue().length));
return localCopy.mapValue(bytes -> Arrays.copyOf(bytes, bytes.length));
}

/**
* Change the shared value value irrespective of its previous state
* Change the shared value irrespective of its previous state
*
* @param newValue new value
* @throws Exception ZK errors, interruptions, etc.
Expand All @@ -139,7 +139,7 @@ public void setValue(byte[] newValue) throws Exception {
Preconditions.checkState(state.get() == State.STARTED, "not started");

Stat result = client.setData().forPath(path, newValue);
updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length));
}

/**
Expand Down Expand Up @@ -171,19 +171,25 @@ public boolean trySetValue(byte[] newValue) throws Exception {
* @param newValue the new value to attempt
* @return true if the change attempt was successful, false if not. If the change
* was not successful, {@link #getValue()} will return the updated value
* @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1}
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception {
Preconditions.checkState(state.get() == State.STARTED, "not started");

VersionedValue<byte[]> current = currentValue.get();
if (previous.getVersion() != current.getVersion() || !Arrays.equals(previous.getValue(), current.getValue())) {
// Omit comparing of getVersion here, so we can test the exception case.
// This affects no correctness as construction of VersionedValue is private.
if (previous.getZxid() != current.getZxid() || !Arrays.equals(previous.getValue(), current.getValue())) {
return false;
}
if (previous.getVersion() == -1) {
throw new IllegalTrySetVersionException();
}

try {
Stat result = client.setData().withVersion(previous.getVersion()).forPath(path, newValue);
updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length));
return true;
} catch (KeeperException.BadVersionException ignore) {
// ignore
Expand All @@ -193,14 +199,13 @@ public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) thr
return false;
}

private void updateValue(int version, byte[] bytes) {
private void updateValue(long zxid, int version, byte[] bytes) {
while (true) {
VersionedValue<byte[]> current = currentValue.get();
if (current.getVersion() >= version) {
// A newer version was concurrently set.
if (current.getZxid() >= zxid) {
return;
}
if (currentValue.compareAndSet(current, new VersionedValue<byte[]>(version, bytes))) {
if (currentValue.compareAndSet(current, new VersionedValue<>(zxid, version, bytes))) {
// Successfully set.
return;
}
Expand Down Expand Up @@ -248,14 +253,14 @@ private void readValue() throws Exception {
Stat localStat = new Stat();
byte[] bytes =
client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path);
updateValue(localStat.getVersion(), bytes);
updateValue(localStat.getMzxid(), localStat.getVersion(), bytes);
}

private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
updateValue(event.getStat().getVersion(), event.getData());
updateValue(event.getStat().getMzxid(), event.getStat().getVersion(), event.getData());
notifyListeners();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,47 @@
package org.apache.curator.framework.recipes.shared;

import com.google.common.base.Preconditions;
import java.util.function.Function;
import org.apache.zookeeper.data.Stat;

/**
* POJO for a version and a value
* POJO for versioned value.
*
* <p>Client must never construct this but get through {@link SharedValue#getVersionedValue()}
* or {@link SharedCount#getVersionedValue()}.
*/
public class VersionedValue<T> {
private final long zxid;
private final int version;
private final T value;

/**
* @param version the version
* @param value the value (cannot be null)
*/
VersionedValue(int version, T value) {
VersionedValue(long zxid, int version, T value) {
this.zxid = zxid;
this.version = version;
this.value = Preconditions.checkNotNull(value, "value cannot be null");
}

/**
* It is {@link Stat#getMzxid()} of the corresponding node.
*/
public long getZxid() {
return zxid;
}

/**
* It is {@link Stat#getVersion()} of the corresponding node.
*
* <p>It is known that this will overflow and hence not monotonic.
*/
public int getVersion() {
return version;
}

public T getValue() {
return value;
}

<R> VersionedValue<R> mapValue(Function<T, R> f) {
return new VersionedValue<>(zxid, version, f.apply(value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.zookeeper.WatchedEvent;
import org.junit.jupiter.api.Test;

@SuppressWarnings("deprecation")
public class TestSharedCount extends CuratorTestBase {
@Test
public void testMultiClients() throws Exception {
Expand Down Expand Up @@ -206,13 +208,20 @@ public void testSimpleVersioned() throws Exception {
assertEquals(count.getCount(), 10);

// Wrong value
assertFalse(count.trySetCount(new VersionedValue<Integer>(3, 20), 7));
assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 3, 20), 7));
// Wrong version
assertFalse(count.trySetCount(new VersionedValue<Integer>(10, 10), 7));
assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 10, 10), 7));
assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid() + 1, 3, 10), 7));

// Server changed
client.setData().forPath("/count", SharedCount.toBytes(88));
assertFalse(count.trySetCount(current, 234));

assertThrows(IllegalTrySetVersionException.class, () -> {
VersionedValue<Integer> cached = count.getVersionedValue();
VersionedValue<Integer> illegal = new VersionedValue<>(cached.getZxid(), -1, cached.getValue());
count.trySetCount(illegal, 20);
});
} finally {
CloseableUtils.closeQuietly(count);
CloseableUtils.closeQuietly(client);
Expand Down

0 comments on commit a9a8020

Please sign in to comment.