Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][test] Catch exception when update data in mockZookeeper #16473

Merged
merged 3 commits into from
Jul 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 90 additions & 69 deletions testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr


executor.execute(() -> {
lock();
try {
lock();

if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
Expand Down Expand Up @@ -394,7 +394,7 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
KeeperState.SyncConnected,
parent)));
}
} catch (Exception ex) {
} catch (Throwable ex) {
log.error("create path : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
Expand Down Expand Up @@ -429,28 +429,33 @@ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperExce
@Override
public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
cb.processResult(failure.get().intValue(), path, ctx, null, null);
return;
} else if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
return;
}

MockZNode value;
lock();
try {
value = tree.get(path);
} finally {
unlockIfLocked();
}
checkReadOpDelay();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
cb.processResult(failure.get().intValue(), path, ctx, null, null);
return;
} else if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
return;
}

if (value == null) {
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
} else {
cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
MockZNode value;
lock();
try {
value = tree.get(path);
} finally {
unlockIfLocked();
}

if (value == null) {
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
} else {
cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
}
} catch (Throwable ex) {
log.error("get data : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
}
});
}
Expand All @@ -459,8 +464,8 @@ public void getData(final String path, boolean watch, final DataCallback cb, fin
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
lock();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand All @@ -485,6 +490,9 @@ public void getData(final String path, final Watcher watcher, final DataCallback
unlockIfLocked();
cb.processResult(0, path, ctx, value.getContent(), stat);
}
} catch (Throwable ex) {
log.error("get data : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
} finally {
unlockIfLocked();
}
Expand All @@ -494,9 +502,9 @@ public void getData(final String path, final Watcher watcher, final DataCallback
@Override
public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) {
executor.execute(() -> {
lock();
List<String> children = Lists.newArrayList();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand Down Expand Up @@ -532,11 +540,14 @@ public void getChildren(final String path, final Watcher watcher, final Children
if (watcher != null) {
watchers.put(path, watcher);
}
cb.processResult(0, path, ctx, children);
} catch (Throwable ex) {
log.error("get children : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}

cb.processResult(0, path, ctx, children);
});
}

Expand Down Expand Up @@ -606,8 +617,8 @@ public List<String> getChildren(String path, boolean watch) throws KeeperExcepti
public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) {
executor.execute(() -> {
Set<String> children = new TreeSet<>();
lock();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand All @@ -633,10 +644,13 @@ public void getChildren(final String path, boolean watcher, final Children2Callb
String child = relativePath.split("/", 2)[0];
children.add(child);
});
cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
} catch (Throwable ex) {
log.error("get children : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
} finally {
unlockIfLocked();
}
cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
});

}
Expand Down Expand Up @@ -705,8 +719,8 @@ public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
@Override
public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
executor.execute(() -> {
lock();
try {
lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
if (failure.isPresent()) {
unlockIfLocked();
Expand All @@ -729,6 +743,9 @@ public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
}
} catch (Throwable ex) {
log.error("exist : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}
Expand Down Expand Up @@ -805,56 +822,60 @@ public void setData(final String path, final byte[] data, int version, final Sta
}

executor.execute(() -> {
final Set<Watcher> toNotify = Sets.newHashSet();
Stat stat;
lock();
try {
final Set<Watcher> toNotify = Sets.newHashSet();
Stat stat;
lock();
try {
Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx, null);
return;
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}

Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
if (failure.isPresent()) {
unlockIfLocked();
cb.processResult(failure.get().intValue(), path, ctx, null);
return;
} else if (stopped) {
unlockIfLocked();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
return;
}
if (!tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
return;
}

if (!tree.containsKey(path)) {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
return;
}
MockZNode mockZNode = tree.get(path);
int currentVersion = mockZNode.getVersion();

MockZNode mockZNode = tree.get(path);
int currentVersion = mockZNode.getVersion();
// Check version
if (version != -1 && version != currentVersion) {
log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
unlockIfLocked();
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
return;
}

// Check version
if (version != -1 && version != currentVersion) {
log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
log.debug("[{}] Updating -- current version: {}", path, currentVersion);
MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
tree.put(path, newZNode);
stat = createStatForZNode(newZNode);
} finally {
unlockIfLocked();
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
return;
}
cb.processResult(0, path, ctx, stat);

log.debug("[{}] Updating -- current version: {}", path, currentVersion);
MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
tree.put(path, newZNode);
stat = createStatForZNode(newZNode);
} finally {
unlockIfLocked();
}
cb.processResult(0, path, ctx, stat);
toNotify.addAll(watchers.get(path));
watchers.removeAll(path);

toNotify.addAll(watchers.get(path));
watchers.removeAll(path);
for (Watcher watcher : toNotify) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
}

for (Watcher watcher : toNotify) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
triggerPersistentWatches(path, null, EventType.NodeDataChanged);
} catch (Throwable ex) {
log.error("Update data : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
}

triggerPersistentWatches(path, null, EventType.NodeDataChanged);
});
}

Expand Down Expand Up @@ -918,8 +939,8 @@ public void delete(final String path, int version) throws InterruptedException,
@Override
public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
Runnable r = () -> {
lock();
try {
lock();
final Set<Watcher> toNotifyDelete = Sets.newHashSet();
toNotifyDelete.addAll(watchers.get(path));

Expand Down Expand Up @@ -965,7 +986,7 @@ public void delete(final String path, int version, final VoidCallback cb, final
parent)));
triggerPersistentWatches(path, parent, EventType.NodeDeleted);
}
} catch (Exception ex) {
} catch (Throwable ex) {
log.error("delete path : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx);
} finally {
Expand Down