Skip to content

Commit 79936be

Browse files
committed
[FLINK-22636][zk] Group job specific zNodes under /jobs zNode
In order to better clean up job specific HA services, this commit changes the layout of the zNode structure so that the JobMaster leader, checkpoints and checkpoint counter is now grouped below the jobs/ zNode. Moreover, this commit groups the leaders of the cluster components (Dispatcher, ResourceManager, RestServer) under /leader/process/latch and /leader/process/connection-info. This closes #15893.
1 parent 927a21c commit 79936be

File tree

26 files changed

+382
-387
lines changed

26 files changed

+382
-387
lines changed

docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,36 +38,12 @@
3838
<td>Integer</td>
3939
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
4040
</tr>
41-
<tr>
42-
<td><h5>high-availability.zookeeper.path.checkpoint-counter</h5></td>
43-
<td style="word-wrap: break-word;">"/checkpoint-counter"</td>
44-
<td>String</td>
45-
<td>ZooKeeper root path (ZNode) for checkpoint counters.</td>
46-
</tr>
47-
<tr>
48-
<td><h5>high-availability.zookeeper.path.checkpoints</h5></td>
49-
<td style="word-wrap: break-word;">"/checkpoints"</td>
50-
<td>String</td>
51-
<td>ZooKeeper root path (ZNode) for completed checkpoints.</td>
52-
</tr>
5341
<tr>
5442
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
5543
<td style="word-wrap: break-word;">"/jobgraphs"</td>
5644
<td>String</td>
5745
<td>ZooKeeper root path (ZNode) for job graphs</td>
5846
</tr>
59-
<tr>
60-
<td><h5>high-availability.zookeeper.path.latch</h5></td>
61-
<td style="word-wrap: break-word;">"/leaderlatch"</td>
62-
<td>String</td>
63-
<td>Defines the znode of the leader latch which is used to elect the leader.</td>
64-
</tr>
65-
<tr>
66-
<td><h5>high-availability.zookeeper.path.leader</h5></td>
67-
<td style="word-wrap: break-word;">"/leader"</td>
68-
<td>String</td>
69-
<td>Defines the znode of the leader which contains the URL to the leader and the current leader session ID.</td>
70-
</tr>
7147
<tr>
7248
<td><h5>high-availability.zookeeper.path.mesos-workers</h5></td>
7349
<td style="word-wrap: break-word;">"/mesos-workers"</td>

docs/layouts/shortcodes/generated/high_availability_configuration.html

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -62,36 +62,12 @@
6262
<td>Integer</td>
6363
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
6464
</tr>
65-
<tr>
66-
<td><h5>high-availability.zookeeper.path.checkpoint-counter</h5></td>
67-
<td style="word-wrap: break-word;">"/checkpoint-counter"</td>
68-
<td>String</td>
69-
<td>ZooKeeper root path (ZNode) for checkpoint counters.</td>
70-
</tr>
71-
<tr>
72-
<td><h5>high-availability.zookeeper.path.checkpoints</h5></td>
73-
<td style="word-wrap: break-word;">"/checkpoints"</td>
74-
<td>String</td>
75-
<td>ZooKeeper root path (ZNode) for completed checkpoints.</td>
76-
</tr>
7765
<tr>
7866
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
7967
<td style="word-wrap: break-word;">"/jobgraphs"</td>
8068
<td>String</td>
8169
<td>ZooKeeper root path (ZNode) for job graphs</td>
8270
</tr>
83-
<tr>
84-
<td><h5>high-availability.zookeeper.path.latch</h5></td>
85-
<td style="word-wrap: break-word;">"/leaderlatch"</td>
86-
<td>String</td>
87-
<td>Defines the znode of the leader latch which is used to elect the leader.</td>
88-
</tr>
89-
<tr>
90-
<td><h5>high-availability.zookeeper.path.leader</h5></td>
91-
<td style="word-wrap: break-word;">"/leader"</td>
92-
<td>String</td>
93-
<td>Defines the znode of the leader which contains the URL to the leader and the current leader session ID.</td>
94-
</tr>
9571
<tr>
9672
<td><h5>high-availability.zookeeper.path.mesos-workers</h5></td>
9773
<td style="word-wrap: break-word;">"/mesos-workers"</td>

flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,7 @@ public final class ConfigConstants {
10131013
public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
10141014
"high-availability.zookeeper.path.namespace";
10151015

1016-
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
1016+
/** @deprecated no longer used. */
10171017
@PublicEvolving @Deprecated
10181018
public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";
10191019

@@ -1026,14 +1026,14 @@ public final class ConfigConstants {
10261026
public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
10271027
"high-availability.zookeeper.path.jobgraphs";
10281028

1029-
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
1029+
/** @deprecated no longer used. */
10301030
@PublicEvolving @Deprecated
10311031
public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
10321032

10331033
/**
10341034
* ZooKeeper root path (ZNode) for completed checkpoints.
10351035
*
1036-
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
1036+
* @deprecated no longer used.
10371037
*/
10381038
@PublicEvolving @Deprecated
10391039
public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
@@ -1042,7 +1042,7 @@ public final class ConfigConstants {
10421042
/**
10431043
* ZooKeeper root path (ZNode) for checkpoint counters.
10441044
*
1045-
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}.
1045+
* @deprecated no longer used.
10461046
*/
10471047
@PublicEvolving @Deprecated
10481048
public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
@@ -1691,21 +1691,19 @@ public final class ConfigConstants {
16911691
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_CLUSTER_ID}. */
16921692
@Deprecated public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";
16931693

1694-
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
1694+
/** @deprecated no longer used. */
16951695
@Deprecated public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";
16961696

1697-
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
1697+
/** @deprecated no longer used. */
16981698
@Deprecated public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
16991699

17001700
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
17011701
@Deprecated public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";
17021702

1703-
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
1703+
/** @deprecated no longer used. */
17041704
@Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";
17051705

1706-
/**
1707-
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}
1708-
*/
1706+
/** @deprecated no longer used. */
17091707
@Deprecated
17101708
public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
17111709

flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,6 @@ public class HighAvailabilityOptions {
115115
.withDescription(
116116
"The root path under which Flink stores its entries in ZooKeeper.");
117117

118-
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
119-
public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
120-
key("high-availability.zookeeper.path.latch")
121-
.defaultValue("/leaderlatch")
122-
.withDeprecatedKeys("recovery.zookeeper.path.latch")
123-
.withDescription(
124-
"Defines the znode of the leader latch which is used to elect the leader.");
125-
126118
/** ZooKeeper root path (ZNode) for job graphs. */
127119
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
128120
public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
@@ -131,31 +123,6 @@ public class HighAvailabilityOptions {
131123
.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
132124
.withDescription("ZooKeeper root path (ZNode) for job graphs");
133125

134-
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
135-
public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
136-
key("high-availability.zookeeper.path.leader")
137-
.defaultValue("/leader")
138-
.withDeprecatedKeys("recovery.zookeeper.path.leader")
139-
.withDescription(
140-
"Defines the znode of the leader which contains the URL to the leader and the current"
141-
+ " leader session ID.");
142-
143-
/** ZooKeeper root path (ZNode) for completed checkpoints. */
144-
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
145-
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
146-
key("high-availability.zookeeper.path.checkpoints")
147-
.defaultValue("/checkpoints")
148-
.withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
149-
.withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");
150-
151-
/** ZooKeeper root path (ZNode) for checkpoint counters. */
152-
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
153-
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
154-
key("high-availability.zookeeper.path.checkpoint-counter")
155-
.defaultValue("/checkpoint-counter")
156-
.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
157-
.withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");
158-
159126
/** ZooKeeper root path (ZNode) for Mesos workers. */
160127
@PublicEvolving
161128
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,20 +112,20 @@ public CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
112112
kubeClient,
113113
configuration,
114114
ioExecutor,
115-
this::getLeaderNameForJobManager,
115+
this::getLeaderPathForJobManager,
116116
lockIdentity);
117117
}
118118

119119
@Override
120120
public JobGraphStore createJobGraphStore() throws Exception {
121121
return KubernetesUtils.createJobGraphStore(
122-
configuration, kubeClient, getLeaderNameForDispatcher(), lockIdentity);
122+
configuration, kubeClient, getLeaderPathForDispatcher(), lockIdentity);
123123
}
124124

125125
@Override
126126
public RunningJobsRegistry createRunningJobsRegistry() {
127127
return new KubernetesRunningJobsRegistry(
128-
kubeClient, getLeaderNameForDispatcher(), lockIdentity);
128+
kubeClient, getLeaderPathForDispatcher(), lockIdentity);
129129
}
130130

131131
@Override
@@ -144,25 +144,25 @@ public void internalCleanup() throws Exception {
144144

145145
@Override
146146
public void internalCleanupJobData(JobID jobID) throws Exception {
147-
kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get();
147+
kubeClient.deleteConfigMap(getLeaderPathForJobManager(jobID)).get();
148148
}
149149

150150
@Override
151-
protected String getLeaderNameForResourceManager() {
151+
protected String getLeaderPathForResourceManager() {
152152
return getLeaderName(RESOURCE_MANAGER_NAME);
153153
}
154154

155155
@Override
156-
protected String getLeaderNameForDispatcher() {
156+
protected String getLeaderPathForDispatcher() {
157157
return getLeaderName(DISPATCHER_NAME);
158158
}
159159

160-
public String getLeaderNameForJobManager(final JobID jobID) {
160+
public String getLeaderPathForJobManager(final JobID jobID) {
161161
return getLeaderName(jobID.toString() + NAME_SEPARATOR + JOB_MANAGER_NAME);
162162
}
163163

164164
@Override
165-
protected String getLeaderNameForRestServer() {
165+
protected String getLeaderPathForRestServer() {
166166
return getLeaderName(REST_SERVER_NAME);
167167
}
168168

flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception {
9494
new VoidBlobStore());
9595
JobID jobID = new JobID();
9696
String configMapName =
97-
kubernetesHaServices.getLeaderNameForJobManager(jobID);
97+
kubernetesHaServices.getLeaderPathForJobManager(jobID);
9898
final KubernetesConfigMap configMap =
9999
new TestingFlinkKubeClient.MockKubernetesConfigMap(
100100
configMapName);

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.flink.runtime.checkpoint;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.common.JobStatus;
2223
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
24+
import org.apache.flink.runtime.util.ZooKeeperUtils;
2325

2426
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
2527
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.shared.SharedCount;
@@ -76,14 +78,11 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
7678
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
7779
*
7880
* @param client Curator ZooKeeper client
79-
* @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job.
8081
*/
8182
public ZooKeeperCheckpointIDCounter(
82-
CuratorFramework client,
83-
String counterPath,
84-
LastStateConnectionStateListener connectionStateListener) {
83+
CuratorFramework client, LastStateConnectionStateListener connectionStateListener) {
8584
this.client = checkNotNull(client, "Curator client");
86-
this.counterPath = checkNotNull(counterPath, "Counter path");
85+
this.counterPath = ZooKeeperUtils.getCheckpointIdCounterPath();
8786
this.sharedCount = new SharedCount(client, counterPath, 1);
8887
this.connectionStateListener = connectionStateListener;
8988
}
@@ -176,4 +175,9 @@ private void checkConnectionState() {
176175
}
177176
});
178177
}
178+
179+
@VisibleForTesting
180+
String getPath() {
181+
return counterPath;
182+
}
179183
}

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,17 @@ public CompletedCheckpointStore createCheckpointStore(
5151
throws Exception {
5252

5353
return ZooKeeperUtils.createCompletedCheckpoints(
54-
client, config, jobId, maxNumberOfCheckpointsToRetain, executor);
54+
ZooKeeperUtils.useNamespaceAndEnsurePath(
55+
client, ZooKeeperUtils.getPathForJob(jobId)),
56+
config,
57+
maxNumberOfCheckpointsToRetain,
58+
executor);
5559
}
5660

5761
@Override
5862
public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
59-
return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID);
63+
return ZooKeeperUtils.createCheckpointIDCounter(
64+
ZooKeeperUtils.useNamespaceAndEnsurePath(
65+
client, ZooKeeperUtils.getPathForJob(jobID)));
6066
}
6167
}

0 commit comments

Comments
 (0)