Skip to content

Commit

Permalink
[fix][test] Catch exception when update data in mockZookeeper (#16473)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4df2593)
  • Loading branch information
mattisonchao authored and codelipenghui committed Jul 15, 2022
1 parent 4868463 commit 1eab46f
Showing 1 changed file with 94 additions and 67 deletions.
161 changes: 94 additions & 67 deletions testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr


executor.execute(() -> {
lock();
try {
lock();

if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
Expand Down Expand Up @@ -394,6 +394,9 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
KeeperState.SyncConnected,
parent)));
}
} catch (Throwable ex) {
log.error("create path : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}
Expand Down Expand Up @@ -426,28 +429,33 @@ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperExce
@Override
public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
cb.processResult(failure.get().intValue(), path, ctx, null, null);
return;
} else if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
return;
}

MockZNode value;
lock();
try {
value = tree.get(path);
} finally {
unlockIfLocked();
}
checkReadOpDelay();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
cb.processResult(failure.get().intValue(), path, ctx, null, null);
return;
} else if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
return;
}

if (value == null) {
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
} else {
cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
MockZNode value;
lock();
try {
value = tree.get(path);
} finally {
unlockIfLocked();
}

if (value == null) {
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
} else {
cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
}
} catch (Throwable ex) {
log.error("get data : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
}
});
}
Expand All @@ -456,8 +464,8 @@ public void getData(final String path, boolean watch, final DataCallback cb, fin
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
lock();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand All @@ -482,6 +490,9 @@ public void getData(final String path, final Watcher watcher, final DataCallback
unlockIfLocked();
cb.processResult(0, path, ctx, value.getContent(), stat);
}
} catch (Throwable ex) {
log.error("get data : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
} finally {
unlockIfLocked();
}
Expand All @@ -491,9 +502,9 @@ public void getData(final String path, final Watcher watcher, final DataCallback
@Override
public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) {
executor.execute(() -> {
lock();
List<String> children = Lists.newArrayList();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand Down Expand Up @@ -529,11 +540,14 @@ public void getChildren(final String path, final Watcher watcher, final Children
if (watcher != null) {
watchers.put(path, watcher);
}
cb.processResult(0, path, ctx, children);
} catch (Throwable ex) {
log.error("get children : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}

cb.processResult(0, path, ctx, children);
});
}

Expand Down Expand Up @@ -603,8 +617,8 @@ public List<String> getChildren(String path, boolean watch) throws KeeperExcepti
public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) {
executor.execute(() -> {
Set<String> children = new TreeSet<>();
lock();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand All @@ -630,10 +644,13 @@ public void getChildren(final String path, boolean watcher, final Children2Callb
String child = relativePath.split("/", 2)[0];
children.add(child);
});
cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
} catch (Throwable ex) {
log.error("get children : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
} finally {
unlockIfLocked();
}
cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
});

}
Expand Down Expand Up @@ -702,8 +719,8 @@ public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
@Override
public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
executor.execute(() -> {
lock();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand All @@ -726,6 +743,9 @@ public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
}
} catch (Throwable ex) {
log.error("exist : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}
Expand Down Expand Up @@ -802,56 +822,60 @@ public void setData(final String path, final byte[] data, int version, final Sta
}

executor.execute(() -> {
final Set<Watcher> toNotify = Sets.newHashSet();
Stat stat;
lock();
try {
final Set<Watcher> toNotify = Sets.newHashSet();
Stat stat;
lock();
try {
Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx, null);
return;
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}

Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx, null);
return;
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}
if (!tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
return;
}

if (!tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
return;
}
MockZNode mockZNode = tree.get(path);
int currentVersion = mockZNode.getVersion();

MockZNode mockZNode = tree.get(path);
int currentVersion = mockZNode.getVersion();
// Check version
if (version != -1 && version != currentVersion) {
log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
unlockIfLocked();
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
return;
}

// Check version
if (version != -1 && version != currentVersion) {
log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
log.debug("[{}] Updating -- current version: {}", path, currentVersion);
MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
tree.put(path, newZNode);
stat = createStatForZNode(newZNode);
} finally {
unlockIfLocked();
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
return;
}
cb.processResult(0, path, ctx, stat);

log.debug("[{}] Updating -- current version: {}", path, currentVersion);
MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
tree.put(path, newZNode);
stat = createStatForZNode(newZNode);
} finally {
unlockIfLocked();
}
cb.processResult(0, path, ctx, stat);
toNotify.addAll(watchers.get(path));
watchers.removeAll(path);

toNotify.addAll(watchers.get(path));
watchers.removeAll(path);
for (Watcher watcher : toNotify) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
}

for (Watcher watcher : toNotify) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
triggerPersistentWatches(path, null, EventType.NodeDataChanged);
} catch (Throwable ex) {
log.error("Update data : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
}

triggerPersistentWatches(path, null, EventType.NodeDataChanged);
});
}

Expand Down Expand Up @@ -915,8 +939,8 @@ public void delete(final String path, int version) throws InterruptedException,
@Override
public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
Runnable r = () -> {
lock();
try {
lock();
final Set<Watcher> toNotifyDelete = Sets.newHashSet();
toNotifyDelete.addAll(watchers.get(path));

Expand Down Expand Up @@ -962,6 +986,9 @@ public void delete(final String path, int version, final VoidCallback cb, final
parent)));
triggerPersistentWatches(path, parent, EventType.NodeDeleted);
}
} catch (Throwable ex) {
log.error("delete path : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx);
} finally {
unlockIfLocked();
}
Expand Down

0 comments on commit 1eab46f

Please sign in to comment.