forked from perezrathke/zookeeper-leader-election
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathZooZNodeDeletionMonitor.java
153 lines (136 loc) · 4.41 KB
/
ZooZNodeDeletionMonitor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
* A simple class that monitors the data and existence of a ZooKeeper
* node. It uses asynchronous ZooKeeper APIs.
*/
import java.util.Arrays;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
// See http://zookeeper.apache.org/doc/current/javaExample.html#ch_Introduction
public class ZooZNodeDeletionMonitor implements Watcher, StatCallback {
// The ZooKeeper API handle
ZooKeeper hZooKeeper = null;
// The ZNode that we're monitoring
String zNodePath = null;
// True if we're dead, False otherwise
boolean isZooKeeperSessionClosed = false;
// The listener object interested in changes to our monitored znode
ZooZNodeDeletionMonitorListener listener = null;
/**
* Other classes use the DataMonitor by implementing this method
*/
public interface ZooZNodeDeletionMonitorListener {
/**
* The existence status of the node has changed.
*/
void onZNodeDeleted();
/**
* The ZooKeeper session is no longer valid.
*/
void onZooKeeperSessionClosed();
}
/**
* @return handle to ZooKeeper API
*/
private ZooKeeper getZooKeeper() {
assert null != hZooKeeper;
return hZooKeeper;
}
/**
* @return Path to zNode that we're monitoring for existence
*/
private final String getZNodePath() {
return zNodePath;
}
/**
* @return True if we're dead, False otherwise
*/
public boolean getIsZooKeeperSessionClosed() {
return isZooKeeperSessionClosed;
}
/**
* @return Listener to callback when data changes
*/
private ZooZNodeDeletionMonitorListener getListener() {
assert null != listener;
return listener;
}
/**
* Constructor
*/
public ZooZNodeDeletionMonitor(ZooKeeper hZooKeeper, String zNodePath, ZooZNodeDeletionMonitorListener listener) {
this.hZooKeeper = hZooKeeper;
this.zNodePath = zNodePath;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
checkZNodeExistsAsync();
}
/**
* Utility function to close monitor and inform listener on session close
*/
private void onZooKeeperSessionClosed() {
isZooKeeperSessionClosed = true;
getListener().onZooKeeperSessionClosed();
}
/**
* Utility function to check on znode existence
*/
private void checkZNodeExistsAsync() {
getZooKeeper().exists( getZNodePath(), true /*bWatch*/, this /*AsyncCallback.StatCallback*/, null /*Object ctx*/) ;
}
/**
* Watcher callback
*/
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
onZooKeeperSessionClosed();
break;
}
} else {
if (path != null && path.equals(getZNodePath())) {
// Something has changed on the node, let's find out
checkZNodeExistsAsync();
}
}
}
/**
* AsyncCallback.StatCallback
*/
@SuppressWarnings("deprecation")
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println( "ZooZNodeDeletionMonitor::processResult:: rc is " + rc );
if ( rc == Code.NoNode /*Code.NONODE.ordinal()*/ ) {
getListener().onZNodeDeleted();
}
else if ( rc == Code.Ok /*Code.OK.ordinal()*/ ) {
// put logging or handle this case separately (zNode exists)
}
else if (
( rc == Code.SessionExpired /*Code.SESSIONEXPIRED.ordinal()*/ )
|| ( rc == Code.NoAuth /*Code.NOAUTH.ordinal()*/ )
) {
onZooKeeperSessionClosed();
}
else {
// Retry errors
checkZNodeExistsAsync();
}
}
}