From d02d1c464a3c9130a309525a5a2e43038a54c0d1 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Fri, 16 Dec 2022 13:53:27 -0800
Subject: [PATCH 01/14] Followers now claim their tokens after finishing
 assignment

---
 .../datastream/server/AssignmentToken.java    | 47 +++++---------
 .../datastream/server/Coordinator.java        |  6 ++
 .../datastream/server/zk/ZkAdapter.java       | 38 +++++++++---
 .../datastream/server/zk/TestZkAdapter.java   | 61 ++++++++++++++++++-
 4 files changed, 106 insertions(+), 46 deletions(-)

diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java b/datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java
index 20c77db61..5305e87fb 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java
@@ -5,6 +5,9 @@
  */
 package com.linkedin.datastream.server;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import com.linkedin.datastream.common.JsonUtils;
 
 
@@ -13,24 +16,20 @@
  * they handled assignment change
  */
 public final class AssignmentToken {
-  private String _issuedBy;
-  private String _issuedFor;
-  private long  _timestamp;
+  private final String _issuedBy;
+  private final String _issuedFor;
+  private final long  _timestamp;
 
   /**
    * Constructor for {@link AssignmentToken}
    */
-  public AssignmentToken(String issuedBy, String issuedFor) {
+  @JsonCreator
+  public AssignmentToken(@JsonProperty("issuedBy") String issuedBy,
+      @JsonProperty("issuedFor") String issuedFor,
+      @JsonProperty("timestamp") long timestamp) {
     _issuedBy = issuedBy;
     _issuedFor = issuedFor;
-    _timestamp = System.currentTimeMillis();
-  }
-
-  /**
-   * Default constructor for {@link AssignmentToken}, required for json ser/de
-   */
-  public AssignmentToken() {
-
+    _timestamp = timestamp;
   }
 
   /**
@@ -50,6 +49,7 @@ public String toJson() {
   /**
    * Gets the name of the leader host that issued the token
    */
+  @JsonProperty("issuedBy")
   public String getIssuedBy() {
     return _issuedBy;
   }
@@ -57,6 +57,7 @@ public String getIssuedBy() {
   /**
    * Gets the name of the host for which the token was issued
    */
+  @JsonProperty("issuedFor")
   public String getIssuedFor() {
     return _issuedFor;
   }
@@ -64,28 +65,8 @@ public String getIssuedFor() {
   /**
    * Gets the timestamp (in UNIX epoch format) for when the token was issued
    */
+  @JsonProperty("timestamp")
   public long getTimestamp() {
     return _timestamp;
   }
-
-  /**
-   * Sets the name of the leader host that issued the token
-   */
-  public void setIssuedBy(String issuedBy) {
-    _issuedBy = issuedBy;
-  }
-
-  /**
-   * Sets the name of the host for which the token was issued
-   */
-  public void setIssuedFor(String issuedFor) {
-    _issuedFor = issuedFor;
-  }
-
-  /**
-   * Sets the timestamp (in UNIX epoch format) for when the token was issued
-   */
-  public void setTimestamp(long timestamp) {
-    _timestamp = timestamp;
-  }
 }
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index 8958de117..b89dd1a6d 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -710,6 +710,12 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
       retryHandleAssignmentChange(isDatastreamUpdate);
     }
 
+    if (_config.getEnableAssignmentTokens()) {
+      List<DatastreamGroup> stoppingDatastreamGroups =
+          fetchDatastreamGroupsWithStatus(Collections.singletonList(DatastreamStatus.STOPPING));
+      _adapter.claimAssignmentTokensOfInstance(currentAssignment, stoppingDatastreamGroups, _adapter.getInstanceName());
+    }
+
     _log.info("END: Coordinator::handleAssignmentChange, Duration: {} milliseconds",
         System.currentTimeMillis() - startAt);
     _metrics.updateMeter(CoordinatorMetrics.Meter.NUM_ASSIGNMENT_CHANGES, 1);
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
index 455c5e71b..bf804bf56 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
@@ -780,7 +780,6 @@ private Map<DatastreamGroup, Set<String>> getStoppingDatastreamGroupInstances(
    */
   private void issueAssignmentTokensForStoppingDatastreams(List<DatastreamGroup> stoppingDatastreamGroups,
       Map<DatastreamGroup, Set<String>> stoppingDgInstances) {
-    String hostname = getLocalHostName();
     for (DatastreamGroup stoppingGroup : stoppingDatastreamGroups) {
       for (Datastream stoppingStream : stoppingGroup.getDatastreams()) {
         String path = KeyBuilder.datastream(_cluster, stoppingStream.getName());
@@ -798,7 +797,7 @@ private void issueAssignmentTokensForStoppingDatastreams(List<DatastreamGroup> s
         for (String instance : instances) {
           String assignmentTokenPath = KeyBuilder.datastreamAssignmentTokenForInstance(_cluster,
               stoppingStream.getName(), instance);
-          AssignmentToken token = new AssignmentToken(hostname, instance);
+          AssignmentToken token = new AssignmentToken(_instanceName, instance, System.currentTimeMillis());
           _zkclient.create(assignmentTokenPath, token.toJson(), CreateMode.PERSISTENT);
         }
       }
@@ -806,16 +805,35 @@ private void issueAssignmentTokensForStoppingDatastreams(List<DatastreamGroup> s
   }
 
   /**
-   * Gets the name of the local host
+   * Claims assignment tokens of successfully stopped tasks of the given instance
+   * @param currentAssignment Current assignment per connector type
+   * @param stoppingDatastreamGroups List of stopping datastream groups
+   * @param instance Instance name
    */
-  private String getLocalHostName() {
-    String hostname = "localhost";
-    try {
-      hostname = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException ex) {
-      LOG.warn("Unable to obtain hostname for leader");
+  public void claimAssignmentTokensOfInstance(Map<String, List<DatastreamTask>> currentAssignment,
+      List<DatastreamGroup> stoppingDatastreamGroups, String instance) {
+    // Flatten the currently assigned (active) tasks and get all task prefixes
+    Set<String> activeTaskPrefixes = currentAssignment.values().stream().
+        flatMap(Collection::stream).map(DatastreamTask::getTaskPrefix).collect(Collectors.toSet());
+
+    // Get all stopping streams with no tasks assigned. For each one, claim assignment tokens (if any)
+    stoppingDatastreamGroups.stream().filter(dg -> !activeTaskPrefixes.contains(dg.getTaskPrefix())).
+        forEach(dg -> deleteAssignmentTokensForDatastreamGroup(dg, instance));
+  }
+
+  private void deleteAssignmentTokensForDatastreamGroup(DatastreamGroup datastreamGroup, String instance) {
+    for (Datastream stream : datastreamGroup.getDatastreams()) {
+      String streamName = stream.getName();
+      String tokenPath = KeyBuilder.datastreamAssignmentTokenForInstance(_cluster, streamName, instance);
+      if (_zkclient.exists(tokenPath)) {
+        if (instance.equals(_instanceName)) {
+          LOG.info("Claiming assignment token for datastream: {}", streamName);
+        } else {
+          LOG.info("Revoking assignment token for datastream: {}, instance: {}", streamName, instance);
+        }
+        _zkclient.delete(tokenPath);
+      }
     }
-    return hostname;
   }
 
   /**
diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java
index 4bf203abf..a92f1b883 100644
--- a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java
+++ b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java
@@ -6,7 +6,6 @@
 package com.linkedin.datastream.server.zk;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -18,6 +17,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1110,8 +1110,63 @@ public void testUpdateAllAssignmentAndIssueTokens() throws Exception {
         KeyBuilder.datastreamAssignmentTokenForInstance(testCluster, datastreamGroup2.getTaskPrefix(), tokenNodes.get(0)));
     AssignmentToken token = AssignmentToken.fromJson(tokenData);
     Assert.assertEquals(token.getIssuedFor(), "instance2");
-    String hostname = InetAddress.getLocalHost().getHostName();
-    Assert.assertEquals(token.getIssuedBy(), hostname);
+    String localInstance = adapter.getInstanceName();
+    Assert.assertEquals(token.getIssuedBy(), localInstance);
+  }
+
+  @Test
+  public void testInstanceClaimsAssignmentTokensProperly() throws Exception {
+    String testCluster = "testInstanceClaimsAssignmentTokens";
+    String connectorType = "connectorType";
+    ZkClient zkClient = new ZkClient(_zkConnectionString);
+    ZkAdapter adapter = createZkAdapter(testCluster);
+    adapter.connect();
+
+    // Creating datastream groups
+    Datastream[] datastreams = DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType,
+        "ketchupStream", "mayoStream");
+    Datastream ketchupStream = datastreams[0];
+    Datastream mayoStream = datastreams[1];
+    DatastreamGroup ketchupDatastreamGroup = new DatastreamGroup(Collections.singletonList(ketchupStream));
+    DatastreamGroup mayoDatastreamGroup = new DatastreamGroup(Collections.singletonList(mayoStream));
+
+    // Simulating a stop request for ketchup stream
+    zkClient.ensurePath(KeyBuilder.datastreamAssignmentTokens(testCluster, ketchupStream.getName()));
+    // Creating two assignment tokens for the stream and adding it to the stopping datastream groups list
+    zkClient.create(KeyBuilder.datastreamAssignmentTokenForInstance(testCluster,
+        ketchupStream.getName(), adapter.getInstanceName()), "token", CreateMode.PERSISTENT);
+    zkClient.create(KeyBuilder.datastreamAssignmentTokenForInstance(testCluster,
+        ketchupStream.getName(), "someOtherInstance"), "token", CreateMode.PERSISTENT);
+
+    List<DatastreamTask> tasks = new ArrayList<>();
+    DatastreamTaskImpl task1 = new DatastreamTaskImpl();
+    task1.setTaskPrefix(mayoDatastreamGroup.getTaskPrefix());
+    task1.setConnectorType(connectorType);
+
+    Map<String, List<DatastreamTask>> assignment = new HashMap<>();
+    assignment.put(connectorType, tasks);
+    List<DatastreamGroup> stoppingDatastreamGroups = Collections.singletonList(ketchupDatastreamGroup);
+
+    adapter.claimAssignmentTokensOfInstance(assignment, stoppingDatastreamGroups, adapter.getInstanceName());
+
+    // Asserting that ZkAdapter claimed token for the given instance, and given instance only
+    List<String> nodes = zkClient.getChildren(
+        KeyBuilder.datastreamAssignmentTokens(testCluster, ketchupStream.getName()));
+    Assert.assertEquals(nodes.size(), 1);
+    Assert.assertEquals(nodes.get(0), "someOtherInstance"); // adapter didn't touch other instance's token
+
+    // Asserting that tokens will be left intact if the stopping stream has active tasks
+    zkClient.create(KeyBuilder.datastreamAssignmentTokenForInstance(testCluster,
+        ketchupStream.getName(), adapter.getInstanceName()), "token", CreateMode.PERSISTENT);
+    DatastreamTaskImpl task2 = new DatastreamTaskImpl();
+    task2.setTaskPrefix(ketchupDatastreamGroup.getTaskPrefix());
+    task2.setConnectorType(connectorType);
+    tasks.add(task2);
+
+    adapter.claimAssignmentTokensOfInstance(assignment, stoppingDatastreamGroups, adapter.getInstanceName());
+    nodes = zkClient.getChildren(
+        KeyBuilder.datastreamAssignmentTokens(testCluster, ketchupStream.getName()));
+    Assert.assertEquals(nodes.size(), 2);
   }
 
   @Test

From 99d811719881f7146f11402f0d861357b367f327 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Tue, 10 Jan 2023 14:40:27 -0800
Subject: [PATCH 02/14] Added code that infers stopped datastreams from
 assignment

---
 .../datastream/server/Coordinator.java        | 78 ++++++++++++++++---
 .../datastream/server/zk/ZkAdapter.java       | 20 +----
 .../datastream/server/TestCoordinator.java    | 49 ++++++++++++
 .../datastream/server/zk/TestZkAdapter.java   | 21 +----
 4 files changed, 123 insertions(+), 45 deletions(-)
 create mode 100644 datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java

diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index b89dd1a6d..2b91c1d31 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -697,11 +697,13 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
     }
 
     // now save the current assignment
+    List<DatastreamTask> oldAssignment = new ArrayList<>(_assignedDatastreamTasks.values());
     _assignedDatastreamTasks.clear();
     _assignedDatastreamTasks.putAll(currentAssignment.values()
         .stream()
         .flatMap(Collection::stream)
         .collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity())));
+    List<DatastreamTask> newAssignment = new ArrayList<>(_assignedDatastreamTasks.values());
 
     if ((totalTasks - submittedTasks) > 0) {
       _log.warn("Failed to submit {} tasks from currentAssignment. Queueing onAssignmentChange event again",
@@ -711,9 +713,7 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
     }
 
     if (_config.getEnableAssignmentTokens()) {
-      List<DatastreamGroup> stoppingDatastreamGroups =
-          fetchDatastreamGroupsWithStatus(Collections.singletonList(DatastreamStatus.STOPPING));
-      _adapter.claimAssignmentTokensOfInstance(currentAssignment, stoppingDatastreamGroups, _adapter.getInstanceName());
+      maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment);
     }
 
     _log.info("END: Coordinator::handleAssignmentChange, Duration: {} milliseconds",
@@ -748,19 +748,15 @@ private Future<Boolean> dispatchAssignmentChangeIfNeeded(String connectorType, L
     ConnectorInfo connectorInfo = _connectors.get(connectorType);
     ConnectorWrapper connector = connectorInfo.getConnector();
 
-    List<DatastreamTask> addedTasks = new ArrayList<>(assignment);
-    List<DatastreamTask> removedTasks;
     List<DatastreamTask> oldAssignment = _assignedDatastreamTasks.values()
         .stream()
         .filter(t -> t.getConnectorType().equals(connectorType))
         .collect(Collectors.toList());
+    List<DatastreamTask> addedTasks = getAddedTasks(assignment, oldAssignment);
+    List<DatastreamTask> removedTasks = getRemovedTasks(assignment, oldAssignment);
 
     // if there are any difference in the list of assignment. Note that if there are no difference
     // between the two lists, then the connector onAssignmentChange() is not called.
-    addedTasks.removeAll(oldAssignment);
-    oldAssignment.removeAll(assignment);
-    removedTasks = oldAssignment;
-
     if (isDatastreamUpdate || !addedTasks.isEmpty() || !removedTasks.isEmpty()) {
       // Populate the event producers before calling the connector with the list of tasks.
       addedTasks.stream()
@@ -773,6 +769,70 @@ private Future<Boolean> dispatchAssignmentChangeIfNeeded(String connectorType, L
     return null;
   }
 
+  private List<DatastreamTask> getRemovedTasks(List<DatastreamTask> newAssignment, List<DatastreamTask> oldAssignment) {
+    List<DatastreamTask> removedTasks = new ArrayList<>(oldAssignment);
+    removedTasks.removeAll(newAssignment);
+    return removedTasks;
+  }
+
+  private List<DatastreamTask> getAddedTasks(List<DatastreamTask> newAssignment, List<DatastreamTask> oldAssignment) {
+    List<DatastreamTask> addedTasks = new ArrayList<>(newAssignment);
+    addedTasks.removeAll(oldAssignment);
+    return addedTasks;
+  }
+
+  private void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssignment,
+      List<DatastreamTask> oldAssignment) {
+    Map<String, List<DatastreamTask>> newAssignmentPerConnector = new HashMap<>();
+    for (DatastreamTask task : newAssignment) {
+      String connectorType = task.getConnectorType();
+      newAssignmentPerConnector.computeIfAbsent(connectorType, k -> new ArrayList<>()).add(task);
+    }
+
+    Map<String, List<DatastreamTask>> oldAssignmentPerConnector = new HashMap<>();
+    for (DatastreamTask task : oldAssignment) {
+      String connectorType = task.getConnectorType();
+      oldAssignmentPerConnector.computeIfAbsent(connectorType, k -> new ArrayList<>()).add(task);
+    }
+
+    Set<String> allConnectors = new HashSet<>();
+    allConnectors.addAll(newAssignmentPerConnector.keySet());
+    allConnectors.addAll(oldAssignmentPerConnector.keySet());
+
+    // The follower nodes don't keep an up-to-date view of the datastreams Zk node and making them listen to changes
+    // to datastreams node risks overwhelming the ZK server with reads. Instead, the follower is inferring the stopping
+    // streams from the task assignment for each connector type. Even if that inference results in a falsely identified
+    // stopping stream, the attempt to claim the token will be a no-op.
+    for (String connector : allConnectors) {
+      List<DatastreamTask> oldTasks = oldAssignmentPerConnector.getOrDefault(connector, Collections.emptyList());
+      List<DatastreamTask> newTasks = newAssignmentPerConnector.getOrDefault(connector, Collections.emptyList());
+      List<DatastreamTask> removedTasks = getRemovedTasks(newTasks, oldTasks);
+
+      List<Datastream> stoppingStreams = inferStoppingDatastreamsFromAssignment(newTasks, removedTasks);
+      List<String> stoppingStreamNames = stoppingStreams.stream().map(Datastream::getName).collect(Collectors.toList());
+
+      if (!stoppingStreamNames.isEmpty()) {
+        _log.info("Trying to claim assignment tokens for streams: {}", stoppingStreamNames);
+        _adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName());
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static List<Datastream> inferStoppingDatastreamsFromAssignment(List<DatastreamTask> newAssignment,
+      List<DatastreamTask> removedTasks) {
+    Map<String, List<Datastream>> taskPrefixToDatastream = removedTasks.stream().
+        collect(Collectors.toMap(DatastreamTask::getTaskPrefix, DatastreamTask::getDatastreams));
+
+    Set<String> removedPrefixes = removedTasks.stream().map(DatastreamTask::getTaskPrefix).collect(Collectors.toSet());
+    Set<String> activePrefixes = newAssignment.stream().map(DatastreamTask::getTaskPrefix).collect(Collectors.toSet());
+    removedPrefixes.removeAll(activePrefixes);
+
+    List<Datastream> stoppingStreams = removedPrefixes.stream().map(taskPrefixToDatastream::get).
+        flatMap(List::stream).collect(Collectors.toList());
+    return stoppingStreams;
+  }
+
   private Future<Boolean> submitAssignment(String connectorType, List<DatastreamTask> assignment,
       boolean isDatastreamUpdate, ConnectorWrapper connector, List<DatastreamTask> removedTasks, boolean retryAndSaveError) {
     // Dispatch the onAssignmentChange to the connector in a separate thread.
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
index bf804bf56..9eb8f02e0 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
@@ -805,24 +805,12 @@ private void issueAssignmentTokensForStoppingDatastreams(List<DatastreamGroup> s
   }
 
   /**
-   * Claims assignment tokens of successfully stopped tasks of the given instance
-   * @param currentAssignment Current assignment per connector type
-   * @param stoppingDatastreamGroups List of stopping datastream groups
+   * Claims assignment tokens for the given datastreams and instance
+   * @param datastreams List of datastreams for which tokens are to be claimed
    * @param instance Instance name
    */
-  public void claimAssignmentTokensOfInstance(Map<String, List<DatastreamTask>> currentAssignment,
-      List<DatastreamGroup> stoppingDatastreamGroups, String instance) {
-    // Flatten the currently assigned (active) tasks and get all task prefixes
-    Set<String> activeTaskPrefixes = currentAssignment.values().stream().
-        flatMap(Collection::stream).map(DatastreamTask::getTaskPrefix).collect(Collectors.toSet());
-
-    // Get all stopping streams with no tasks assigned. For each one, claim assignment tokens (if any)
-    stoppingDatastreamGroups.stream().filter(dg -> !activeTaskPrefixes.contains(dg.getTaskPrefix())).
-        forEach(dg -> deleteAssignmentTokensForDatastreamGroup(dg, instance));
-  }
-
-  private void deleteAssignmentTokensForDatastreamGroup(DatastreamGroup datastreamGroup, String instance) {
-    for (Datastream stream : datastreamGroup.getDatastreams()) {
+  public void claimAssignmentTokensForDatastreams(List<Datastream> datastreams, String instance) {
+    for (Datastream stream : datastreams) {
       String streamName = stream.getName();
       String tokenPath = KeyBuilder.datastreamAssignmentTokenForInstance(_cluster, streamName, instance);
       if (_zkclient.exists(tokenPath)) {
diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
new file mode 100644
index 000000000..22788960a
--- /dev/null
+++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
@@ -0,0 +1,49 @@
+package com.linkedin.datastream.server;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.linkedin.datastream.common.Datastream;
+import com.linkedin.datastream.testutil.DatastreamTestUtils;
+
+
+public class TestCoordinator {
+  @Test
+  public void testInferStoppingStreamsFromAssignment() {
+    String connectorType = "testConnector";
+    Datastream ketchupStream = DatastreamTestUtils.createDatastream(connectorType, "ketchupStream", "random");
+    Datastream mayoStream = DatastreamTestUtils.createDatastream(connectorType, "mayoStream", "random");
+    Datastream mustardStream = DatastreamTestUtils.createDatastream(connectorType, "mustardStream", "random");
+
+    DatastreamTaskImpl task1 = new DatastreamTaskImpl();
+    task1.setTaskPrefix(ketchupStream.getName());
+    task1.setDatastreams(Collections.singletonList(ketchupStream));
+    DatastreamTaskImpl task2 = new DatastreamTaskImpl();
+    task2.setTaskPrefix(mustardStream.getName());
+    task2.setDatastreams(Collections.singletonList(mustardStream));
+    List<DatastreamTask> newAssignment = Arrays.asList(task1, task2);
+
+    DatastreamTaskImpl task3 = new DatastreamTaskImpl();
+    task3.setTaskPrefix(mayoStream.getName());
+    task3.setDatastreams(Collections.singletonList(mayoStream));
+    List<DatastreamTask> removedTasks = Collections.singletonList(task3);
+
+    List<Datastream> stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
+    Assert.assertEquals(stoppingStreams.size(), 1);
+    Assert.assertEquals(stoppingStreams.get(0), mayoStream);
+
+    removedTasks = Arrays.asList(task2, task3);
+    stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
+    Assert.assertEquals(stoppingStreams.size(), 1);
+    Assert.assertEquals(stoppingStreams.get(0), mayoStream);
+
+    newAssignment = Arrays.asList(task1, task2, task3);
+    stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
+    Assert.assertEquals(stoppingStreams.size(), 0);
+  }
+
+}
diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java
index a92f1b883..c761ea9bd 100644
--- a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java
+++ b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java
@@ -1127,7 +1127,6 @@ public void testInstanceClaimsAssignmentTokensProperly() throws Exception {
         "ketchupStream", "mayoStream");
     Datastream ketchupStream = datastreams[0];
     Datastream mayoStream = datastreams[1];
-    DatastreamGroup ketchupDatastreamGroup = new DatastreamGroup(Collections.singletonList(ketchupStream));
     DatastreamGroup mayoDatastreamGroup = new DatastreamGroup(Collections.singletonList(mayoStream));
 
     // Simulating a stop request for ketchup stream
@@ -1138,35 +1137,17 @@ public void testInstanceClaimsAssignmentTokensProperly() throws Exception {
     zkClient.create(KeyBuilder.datastreamAssignmentTokenForInstance(testCluster,
         ketchupStream.getName(), "someOtherInstance"), "token", CreateMode.PERSISTENT);
 
-    List<DatastreamTask> tasks = new ArrayList<>();
     DatastreamTaskImpl task1 = new DatastreamTaskImpl();
     task1.setTaskPrefix(mayoDatastreamGroup.getTaskPrefix());
     task1.setConnectorType(connectorType);
 
-    Map<String, List<DatastreamTask>> assignment = new HashMap<>();
-    assignment.put(connectorType, tasks);
-    List<DatastreamGroup> stoppingDatastreamGroups = Collections.singletonList(ketchupDatastreamGroup);
-
-    adapter.claimAssignmentTokensOfInstance(assignment, stoppingDatastreamGroups, adapter.getInstanceName());
+    adapter.claimAssignmentTokensForDatastreams(Collections.singletonList(ketchupStream), adapter.getInstanceName());
 
     // Asserting that ZkAdapter claimed token for the given instance, and given instance only
     List<String> nodes = zkClient.getChildren(
         KeyBuilder.datastreamAssignmentTokens(testCluster, ketchupStream.getName()));
     Assert.assertEquals(nodes.size(), 1);
     Assert.assertEquals(nodes.get(0), "someOtherInstance"); // adapter didn't touch other instance's token
-
-    // Asserting that tokens will be left intact if the stopping stream has active tasks
-    zkClient.create(KeyBuilder.datastreamAssignmentTokenForInstance(testCluster,
-        ketchupStream.getName(), adapter.getInstanceName()), "token", CreateMode.PERSISTENT);
-    DatastreamTaskImpl task2 = new DatastreamTaskImpl();
-    task2.setTaskPrefix(ketchupDatastreamGroup.getTaskPrefix());
-    task2.setConnectorType(connectorType);
-    tasks.add(task2);
-
-    adapter.claimAssignmentTokensOfInstance(assignment, stoppingDatastreamGroups, adapter.getInstanceName());
-    nodes = zkClient.getChildren(
-        KeyBuilder.datastreamAssignmentTokens(testCluster, ketchupStream.getName()));
-    Assert.assertEquals(nodes.size(), 2);
   }
 
   @Test

From bc3d122b29e3f52b6a2a651355e7a7175a9fddf4 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Wed, 11 Jan 2023 11:52:07 -0800
Subject: [PATCH 03/14] Added more tests

---
 .../datastream/server/TestCoordinator.java    | 91 +++++++++++++++++++
 .../datastream/server/Coordinator.java        |  3 +-
 .../datastream/server/TestCoordinator.java    | 49 ----------
 3 files changed, 93 insertions(+), 50 deletions(-)
 delete mode 100644 datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java

diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
index 6e631cf89..231fd5749 100644
--- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
+++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
@@ -90,6 +90,7 @@
 import static com.linkedin.datastream.common.DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX;
 import static com.linkedin.datastream.common.DatastreamMetadataConstants.TTL_MS;
 import static com.linkedin.datastream.server.assignment.StickyMulticastStrategyFactory.DEFAULT_IMBALANCE_THRESHOLD;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anyObject;
@@ -3114,6 +3115,96 @@ void testOnSessionExpired(boolean handleNewSession) throws DatastreamException,
     instance1.getDatastreamCache().getZkclient().close();
   }
 
+  @Test
+  public void testInferStoppingStreamsFromAssignment() {
+    String connectorType = "testConnector";
+    Datastream ketchupStream = DatastreamTestUtils.createDatastream(connectorType, "ketchupStream", "random");
+    Datastream mayoStream = DatastreamTestUtils.createDatastream(connectorType, "mayoStream", "random");
+    Datastream mustardStream = DatastreamTestUtils.createDatastream(connectorType, "mustardStream", "random");
+
+    DatastreamTaskImpl task1 = new DatastreamTaskImpl();
+    task1.setTaskPrefix(ketchupStream.getName());
+    task1.setDatastreams(Collections.singletonList(ketchupStream));
+    DatastreamTaskImpl task2 = new DatastreamTaskImpl();
+    task2.setTaskPrefix(mustardStream.getName());
+    task2.setDatastreams(Collections.singletonList(mustardStream));
+    List<DatastreamTask> newAssignment = Arrays.asList(task1, task2);
+
+    DatastreamTaskImpl task3 = new DatastreamTaskImpl();
+    task3.setTaskPrefix(mayoStream.getName());
+    task3.setDatastreams(Collections.singletonList(mayoStream));
+    List<DatastreamTask> removedTasks = Collections.singletonList(task3);
+
+    List<Datastream> stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
+    Assert.assertEquals(stoppingStreams.size(), 1);
+    Assert.assertEquals(stoppingStreams.get(0), mayoStream);
+
+    removedTasks = Arrays.asList(task2, task3);
+    stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
+    Assert.assertEquals(stoppingStreams.size(), 1);
+    Assert.assertEquals(stoppingStreams.get(0), mayoStream);
+
+    newAssignment = Arrays.asList(task1, task2, task3);
+    stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
+    Assert.assertEquals(stoppingStreams.size(), 0);
+  }
+
+  @Test
+  public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
+    String testCluster = "testCluster";
+    String pizzaConnector = "pizzaConnector";
+    String thinCrustStream = "thinCrustStream";
+    String deepDishStream = "deepDishStream";
+
+    String bagelConnector = "bagelConnector";
+    String eggBagelStream = "eggBagelStream";
+
+    Properties props = new Properties();
+    props.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
+    props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
+    props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
+    props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
+
+    ZkClient zkClient = new ZkClient(_zkConnectionString);
+    _cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
+    Coordinator coordinator = new TestCoordinatorWithSpyZkAdapter(_cachedDatastreamReader, props);
+    coordinator.start();
+
+    Datastream[] pizzaStreams = DatastreamTestUtils.createAndStoreDatastreams(
+        zkClient, testCluster, pizzaConnector, thinCrustStream, deepDishStream);
+    Datastream[] bagelStreams = DatastreamTestUtils.createAndStoreDatastreams(zkClient,
+        testCluster, bagelConnector, eggBagelStream);
+
+    DatastreamTaskImpl pizzaTask1 = new DatastreamTaskImpl();
+    pizzaTask1.setConnectorType(pizzaConnector);
+    pizzaTask1.setTaskPrefix(thinCrustStream);
+    pizzaTask1.setDatastreams(Collections.singletonList(pizzaStreams[0]));
+
+    DatastreamTaskImpl pizzaTask2 = new DatastreamTaskImpl();
+    pizzaTask2.setConnectorType(pizzaConnector);
+    pizzaTask2.setTaskPrefix(deepDishStream);
+    pizzaTask2.setDatastreams(Collections.singletonList(pizzaStreams[1]));
+
+    DatastreamTaskImpl bagelTask1 = new DatastreamTaskImpl();
+    bagelTask1.setConnectorType(bagelConnector);
+    bagelTask1.setTaskPrefix(eggBagelStream);
+    bagelTask1.setDatastreams(Collections.singletonList(bagelStreams[0]));
+
+    List<DatastreamTask> oldAssignment = new ArrayList<>(Arrays.asList(pizzaTask1, pizzaTask2, bagelTask1));
+    List<DatastreamTask> newAssignment = new ArrayList<>(Collections.singletonList(pizzaTask1));
+
+    ZkAdapter spyZkAdapter = coordinator.getZkAdapter();
+    coordinator.maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment);
+
+    // Verify that claim assignment tokens is called twice:
+    // (1) For deepDishStream, since it no longer has tasks in the new assignment
+    // (2) For eggBagelStream, since the bagel connector has tasks in the new assignment (connector stopped case)
+    verify(spyZkAdapter, times(2)).claimAssignmentTokensForDatastreams(any(), any());
+
+    zkClient.close();
+    coordinator.stop();
+  }
+
   // helper method: assert that within a timeout value, the connector are assigned the specific
   // tasks with the specified names.
   private void assertConnectorAssignment(TestHookConnector connector, long timeoutMs, String... datastreamNames)
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index 2b91c1d31..1e1a2756a 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -781,7 +781,8 @@ private List<DatastreamTask> getAddedTasks(List<DatastreamTask> newAssignment, L
     return addedTasks;
   }
 
-  private void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssignment,
+  @VisibleForTesting
+  void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssignment,
       List<DatastreamTask> oldAssignment) {
     Map<String, List<DatastreamTask>> newAssignmentPerConnector = new HashMap<>();
     for (DatastreamTask task : newAssignment) {
diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
deleted file mode 100644
index 22788960a..000000000
--- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.linkedin.datastream.server;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.datastream.common.Datastream;
-import com.linkedin.datastream.testutil.DatastreamTestUtils;
-
-
-public class TestCoordinator {
-  @Test
-  public void testInferStoppingStreamsFromAssignment() {
-    String connectorType = "testConnector";
-    Datastream ketchupStream = DatastreamTestUtils.createDatastream(connectorType, "ketchupStream", "random");
-    Datastream mayoStream = DatastreamTestUtils.createDatastream(connectorType, "mayoStream", "random");
-    Datastream mustardStream = DatastreamTestUtils.createDatastream(connectorType, "mustardStream", "random");
-
-    DatastreamTaskImpl task1 = new DatastreamTaskImpl();
-    task1.setTaskPrefix(ketchupStream.getName());
-    task1.setDatastreams(Collections.singletonList(ketchupStream));
-    DatastreamTaskImpl task2 = new DatastreamTaskImpl();
-    task2.setTaskPrefix(mustardStream.getName());
-    task2.setDatastreams(Collections.singletonList(mustardStream));
-    List<DatastreamTask> newAssignment = Arrays.asList(task1, task2);
-
-    DatastreamTaskImpl task3 = new DatastreamTaskImpl();
-    task3.setTaskPrefix(mayoStream.getName());
-    task3.setDatastreams(Collections.singletonList(mayoStream));
-    List<DatastreamTask> removedTasks = Collections.singletonList(task3);
-
-    List<Datastream> stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
-    Assert.assertEquals(stoppingStreams.size(), 1);
-    Assert.assertEquals(stoppingStreams.get(0), mayoStream);
-
-    removedTasks = Arrays.asList(task2, task3);
-    stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
-    Assert.assertEquals(stoppingStreams.size(), 1);
-    Assert.assertEquals(stoppingStreams.get(0), mayoStream);
-
-    newAssignment = Arrays.asList(task1, task2, task3);
-    stoppingStreams = Coordinator.inferStoppingDatastreamsFromAssignment(newAssignment, removedTasks);
-    Assert.assertEquals(stoppingStreams.size(), 0);
-  }
-
-}

From 10e5f2a9d6b6e258a32455da7b8a0dca0b532762 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Wed, 11 Jan 2023 12:10:28 -0800
Subject: [PATCH 04/14] Added more tests

---
 .../datastream/server/TestCoordinator.java    | 27 ++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)

diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
index 231fd5749..89e3d17ba 100644
--- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
+++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
@@ -30,6 +30,7 @@
 
 import org.apache.commons.lang3.Validate;
 import org.jetbrains.annotations.NotNull;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
 import org.slf4j.Logger;
@@ -3198,8 +3199,14 @@ public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
 
     // Verify that claim assignment tokens is called twice:
     // (1) For deepDishStream, since it no longer has tasks in the new assignment
-    // (2) For eggBagelStream, since the bagel connector has tasks in the new assignment (connector stopped case)
+    // (2) For eggBagelStream, since the bagel connector has no tasks in the new assignment (connector stopped case)
+    CollectionContainsMatcher<Datastream> deepDishStreamMatcher = new CollectionContainsMatcher<>(pizzaStreams[1]);
+    CollectionContainsMatcher<Datastream> eggBagelStreamMatcher = new CollectionContainsMatcher<>(bagelStreams[0]);
     verify(spyZkAdapter, times(2)).claimAssignmentTokensForDatastreams(any(), any());
+    verify(spyZkAdapter, times(1)).
+        claimAssignmentTokensForDatastreams(argThat(deepDishStreamMatcher), any());
+    verify(spyZkAdapter, times(1)).
+        claimAssignmentTokensForDatastreams(argThat(eggBagelStreamMatcher), any());
 
     zkClient.close();
     coordinator.stop();
@@ -3249,6 +3256,24 @@ private void deleteLiveInstanceNode(ZkClient zkClient, String cluster, Coordinat
     zkClient.deleteRecursive(path);
   }
 
+  static class CollectionContainsMatcher<T> extends ArgumentMatcher<List<T>> {
+    private final T _element;
+
+    public CollectionContainsMatcher(T element) {
+      _element = element;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean matches(Object argument) {
+      if (!(argument instanceof List))
+        return false;
+
+      List<T> argumentAsList = (List<T>)argument;
+      return argumentAsList.contains(_element);
+    }
+  }
+
   /**
    * Base class of test Connector implementations
    */

From 628b9e90c4fdc55fad5de5eb6d58d07e5ed71787 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Thu, 12 Jan 2023 10:54:01 -0800
Subject: [PATCH 05/14] Fixed checkstyle issues

---
 .../com/linkedin/datastream/server/TestCoordinator.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
index 89e3d17ba..a5ff23363 100644
--- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
+++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
@@ -91,7 +91,8 @@
 import static com.linkedin.datastream.common.DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX;
 import static com.linkedin.datastream.common.DatastreamMetadataConstants.TTL_MS;
 import static com.linkedin.datastream.server.assignment.StickyMulticastStrategyFactory.DEFAULT_IMBALANCE_THRESHOLD;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anyObject;
@@ -3266,10 +3267,11 @@ public CollectionContainsMatcher(T element) {
     @Override
     @SuppressWarnings("unchecked")
     public boolean matches(Object argument) {
-      if (!(argument instanceof List))
+      if (!(argument instanceof List)) {
         return false;
+      }
 
-      List<T> argumentAsList = (List<T>)argument;
+      List<T> argumentAsList = (List<T>) argument;
       return argumentAsList.contains(_element);
     }
   }

From bf7a753d2fa391b1796893ded5196d3be6e904ae Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Tue, 17 Jan 2023 13:29:48 -0800
Subject: [PATCH 06/14] Added extra logging

---
 .../main/java/com/linkedin/datastream/server/Coordinator.java   | 2 ++
 .../main/java/com/linkedin/datastream/server/zk/ZkAdapter.java  | 2 ++
 2 files changed, 4 insertions(+)

diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index 1e1a2756a..d12e301a5 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -815,6 +815,8 @@ void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssign
       if (!stoppingStreamNames.isEmpty()) {
         _log.info("Trying to claim assignment tokens for streams: {}", stoppingStreamNames);
         _adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName());
+      } else {
+        _log.info("No streams have been inferred as stopping and no assignment tokens will be claimed");
       }
     }
   }
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
index 9eb8f02e0..f96356621 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
@@ -820,6 +820,8 @@ public void claimAssignmentTokensForDatastreams(List<Datastream> datastreams, St
           LOG.info("Revoking assignment token for datastream: {}, instance: {}", streamName, instance);
         }
         _zkclient.delete(tokenPath);
+      } else {
+        LOG.debug("Attempt to claim non-existing assignment token {}", tokenPath);
       }
     }
   }

From 455cdd3ee062378f6cb6584dd61fb994b6f3849c Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Wed, 18 Jan 2023 13:38:39 -0800
Subject: [PATCH 07/14] Coordinator waits for tasks to stop before claiming
 tokens

---
 .../kafka/AbstractKafkaConnector.java         |  14 +++
 .../server/api/connector/Connector.java       |   9 ++
 .../datastream/server/TestCoordinator.java    | 107 ++++++++++++++++--
 .../datastream/server/Coordinator.java        |  40 ++++++-
 .../datastream/server/CoordinatorConfig.java  |  17 +++
 5 files changed, 173 insertions(+), 14 deletions(-)

diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java
index 66fd80481..dc58ee427 100644
--- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java
+++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java
@@ -788,4 +788,18 @@ public Map<String, Map<Integer, Long>> getConsumptionLagForTask(DatastreamTask t
       return taskEntry.getConnectorTask().getKafkaTopicPartitionTracker().getConsumptionLag();
     }
   }
+
+  public List<String> getActiveTasks() {
+    List<DatastreamTask> tasks = new ArrayList<>();
+
+    synchronized (_runningTasks) {
+      synchronized (_tasksPendingStop) {
+        tasks.addAll(_runningTasks.keySet());
+        tasks.addAll(_tasksPendingStop.keySet());
+      }
+    }
+
+    _logger.debug("Found {} active tasks assigned to the connector", tasks.size());
+    return tasks.stream().map(DatastreamTask::getId).collect(Collectors.toList());
+  }
 }
diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
index 7dee429be..c1d1f3a58 100644
--- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
+++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
@@ -66,6 +66,15 @@ public interface Connector extends MetricsAware, DatastreamChangeListener {
    */
   void initializeDatastream(Datastream stream, List<Datastream> allDatastreams) throws DatastreamValidationException;
 
+  /**
+   * Returns a list with IDs for tasks that are active (i.e. running or pending stop)
+   * @return A list with task IDs that are currently running on the coordinator
+   * @throws UnsupportedOperationException if not implemented by Connector classes.
+   */
+  default List<String> getActiveTasks() {
+    throw new UnsupportedOperationException("Running tasks API is not supported unless implemented by coordinators");
+  }
+
   /**
    * Validate the update datastreams operation. By default this is not supported. Any connectors that want to support
    * datastream updates should override this method to perform the validation needed.
diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
index a5ff23363..26082c63a 100644
--- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
+++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
@@ -21,6 +21,7 @@
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -3153,12 +3154,12 @@ public void testInferStoppingStreamsFromAssignment() {
 
   @Test
   public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
-    String testCluster = "testCluster";
-    String pizzaConnector = "pizzaConnector";
+    String testCluster = "testClaimAssignmentTokensForStoppingStreams";
+    String pizzaConnectorName = "pizzaConnector";
     String thinCrustStream = "thinCrustStream";
     String deepDishStream = "deepDishStream";
 
-    String bagelConnector = "bagelConnector";
+    String bagelConnectorName = "bagelConnector";
     String eggBagelStream = "eggBagelStream";
 
     Properties props = new Properties();
@@ -3170,25 +3171,31 @@ public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
     ZkClient zkClient = new ZkClient(_zkConnectionString);
     _cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
     Coordinator coordinator = new TestCoordinatorWithSpyZkAdapter(_cachedDatastreamReader, props);
+    Connector pizzaConnector = new TestHookConnector(pizzaConnectorName, pizzaConnectorName);
+    Connector bagelConnector = new TestHookConnector(bagelConnectorName, bagelConnectorName);
+    coordinator.addConnector(pizzaConnectorName, pizzaConnector, new BroadcastStrategy(Optional.empty()), false,
+        new SourceBasedDeduper(), null);
+    coordinator.addConnector(bagelConnectorName, bagelConnector, new BroadcastStrategy(Optional.empty()), false,
+        new SourceBasedDeduper(), null);
     coordinator.start();
 
     Datastream[] pizzaStreams = DatastreamTestUtils.createAndStoreDatastreams(
-        zkClient, testCluster, pizzaConnector, thinCrustStream, deepDishStream);
+        zkClient, testCluster, pizzaConnectorName, thinCrustStream, deepDishStream);
     Datastream[] bagelStreams = DatastreamTestUtils.createAndStoreDatastreams(zkClient,
-        testCluster, bagelConnector, eggBagelStream);
+        testCluster, bagelConnectorName, eggBagelStream);
 
     DatastreamTaskImpl pizzaTask1 = new DatastreamTaskImpl();
-    pizzaTask1.setConnectorType(pizzaConnector);
+    pizzaTask1.setConnectorType(pizzaConnectorName);
     pizzaTask1.setTaskPrefix(thinCrustStream);
     pizzaTask1.setDatastreams(Collections.singletonList(pizzaStreams[0]));
 
     DatastreamTaskImpl pizzaTask2 = new DatastreamTaskImpl();
-    pizzaTask2.setConnectorType(pizzaConnector);
+    pizzaTask2.setConnectorType(pizzaConnectorName);
     pizzaTask2.setTaskPrefix(deepDishStream);
     pizzaTask2.setDatastreams(Collections.singletonList(pizzaStreams[1]));
 
     DatastreamTaskImpl bagelTask1 = new DatastreamTaskImpl();
-    bagelTask1.setConnectorType(bagelConnector);
+    bagelTask1.setConnectorType(bagelConnectorName);
     bagelTask1.setTaskPrefix(eggBagelStream);
     bagelTask1.setDatastreams(Collections.singletonList(bagelStreams[0]));
 
@@ -3213,6 +3220,85 @@ public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
     coordinator.stop();
   }
 
+  @Test
+  public void testTokensNotClaimedForConnectorThatFailedToStop() throws Exception {
+    String testCluster = "testTokensNotClaimedForConnectorThatFailedToStop";
+    String connectorName = "mockConnector";
+    String streamName = "someStream";
+
+    Properties props = new Properties();
+    props.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
+    props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
+    props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
+    props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
+    props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_TIMEOUT_MS, "100");
+    props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_RETRY_PERIOD_MS, "10");
+
+    ZkClient zkClient = new ZkClient(_zkConnectionString);
+    _cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
+    Coordinator coordinator = new TestCoordinatorWithSpyZkAdapter(_cachedDatastreamReader, props);
+    Connector mockConnector = Mockito.mock(Connector.class);
+    coordinator.addConnector(connectorName, mockConnector, new BroadcastStrategy(Optional.empty()), false,
+        new SourceBasedDeduper(), null);
+    coordinator.start();
+
+    Datastream testStream = DatastreamTestUtils.
+        createAndStoreDatastreams(zkClient, testCluster, connectorName, streamName)[0];
+    DatastreamTaskImpl task1 = new DatastreamTaskImpl();
+    task1.setConnectorType(connectorName);
+    task1.setTaskPrefix(streamName);
+    task1.setDatastreams(Collections.singletonList(testStream));
+
+    List<DatastreamTask> oldAssignment = new ArrayList<>(Collections.singletonList(task1));
+    List<DatastreamTask> newAssignment = Collections.emptyList();
+
+    // task1 never stops
+    when(mockConnector.getActiveTasks()).thenReturn(Collections.singletonList(task1.getId()));
+    coordinator.maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment);
+    ZkAdapter spyZkAdapter = coordinator.getZkAdapter();
+
+    // Verify that:
+    // (1) Active tasks are queried from the connector at least once
+    // (2) No tokens are claimed for the stream since the task never stops
+    verify(mockConnector, Mockito.atLeast(1)).getActiveTasks();
+    verify(spyZkAdapter, times(0)).claimAssignmentTokensForDatastreams(any(), any());
+  }
+
+  @Test
+  public void testConnectorTasksHaveStopped() throws Exception {
+    String testCluster = "testConnectorTasksHaveStopped";
+    String connectorName = "connector";
+    String streamName = "stream";
+    Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster);
+    Connector connector = Mockito.mock(Connector.class);
+    coordinator.addConnector(connectorName, connector, new BroadcastStrategy(Optional.empty()), false,
+        new SourceBasedDeduper(), null);
+
+    List<DatastreamTaskImpl> allTasks = new ArrayList<>(Arrays.asList(
+        new DatastreamTaskImpl(),
+        new DatastreamTaskImpl(),
+        new DatastreamTaskImpl(),
+        new DatastreamTaskImpl()));
+    allTasks.forEach(t -> t.setTaskPrefix(streamName));
+    allTasks.forEach(t -> t.setConnectorType(connectorName));
+    allTasks.forEach(t -> t.setId(UUID.randomUUID().toString())); // setting unique IDs for tasks
+    Set<String> allTaskIds = allTasks.stream().map(DatastreamTaskImpl::getId).collect(Collectors.toSet());
+
+    when(connector.getActiveTasks()).thenReturn(Collections.emptyList());
+    Assert.assertTrue(coordinator.connectorTasksHaveStopped(connectorName, allTaskIds));
+
+    when(connector.getActiveTasks()).thenReturn(new ArrayList<>(allTaskIds));
+    Assert.assertFalse(coordinator.connectorTasksHaveStopped(connectorName, allTaskIds));
+    Assert.assertFalse(coordinator.connectorTasksHaveStopped(connectorName,
+        new HashSet<>(Collections.singletonList(allTasks.get(0).getId()))));
+    Assert.assertTrue(coordinator.connectorTasksHaveStopped(connectorName, Collections.emptySet()));
+
+    when(connector.getActiveTasks()).thenReturn(Collections.singletonList(allTasks.get(0).getId()));
+    Assert.assertFalse(coordinator.connectorTasksHaveStopped(connectorName, allTaskIds));
+    Assert.assertTrue(coordinator.connectorTasksHaveStopped(connectorName,
+        new HashSet<>(Collections.singletonList(allTasks.get(1).getId()))));
+  }
+
   // helper method: assert that within a timeout value, the connector are assigned the specific
   // tasks with the specified names.
   private void assertConnectorAssignment(TestHookConnector connector, long timeoutMs, String... datastreamNames)
@@ -3374,6 +3460,11 @@ public String toString() {
     public List<BrooklinMetricInfo> getMetricInfos() {
       return null;
     }
+
+    @Override
+    public List<String> getActiveTasks() {
+      return Collections.emptyList();
+    }
   }
 
   /**
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index d12e301a5..853c7b04b 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -56,6 +57,7 @@
 import com.linkedin.datastream.common.DatastreamTransientException;
 import com.linkedin.datastream.common.DatastreamUtils;
 import com.linkedin.datastream.common.ErrorLogger;
+import com.linkedin.datastream.common.PollUtils;
 import com.linkedin.datastream.common.VerifiableProperties;
 import com.linkedin.datastream.metrics.BrooklinCounterInfo;
 import com.linkedin.datastream.metrics.BrooklinGaugeInfo;
@@ -713,7 +715,13 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
     }
 
     if (_config.getEnableAssignmentTokens()) {
-      maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment);
+      try {
+        // Queue assignment token claim task
+        _executor.schedule(() -> maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment),
+            0, TimeUnit.MILLISECONDS);
+      } catch (RejectedExecutionException ex) {
+        _log.warn("Failed to schedule the task for claiming assignment tokens", ex);
+      }
     }
 
     _log.info("END: Coordinator::handleAssignmentChange, Duration: {} milliseconds",
@@ -784,6 +792,8 @@ private List<DatastreamTask> getAddedTasks(List<DatastreamTask> newAssignment, L
   @VisibleForTesting
   void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssignment,
       List<DatastreamTask> oldAssignment) {
+    // TODO Check for performance optimizations
+    // TODO Add metrics for number of streams that are inferred as stopping
     Map<String, List<DatastreamTask>> newAssignmentPerConnector = new HashMap<>();
     for (DatastreamTask task : newAssignment) {
       String connectorType = task.getConnectorType();
@@ -804,21 +814,39 @@ void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssign
     // to datastreams node risks overwhelming the ZK server with reads. Instead, the follower is inferring the stopping
     // streams from the task assignment for each connector type. Even if that inference results in a falsely identified
     // stopping stream, the attempt to claim the token will be a no-op.
-    for (String connector : allConnectors) {
+    allConnectors.parallelStream().forEach(connector -> {
       List<DatastreamTask> oldTasks = oldAssignmentPerConnector.getOrDefault(connector, Collections.emptyList());
       List<DatastreamTask> newTasks = newAssignmentPerConnector.getOrDefault(connector, Collections.emptyList());
       List<DatastreamTask> removedTasks = getRemovedTasks(newTasks, oldTasks);
 
       List<Datastream> stoppingStreams = inferStoppingDatastreamsFromAssignment(newTasks, removedTasks);
-      List<String> stoppingStreamNames = stoppingStreams.stream().map(Datastream::getName).collect(Collectors.toList());
+      Set<String> stoppingStreamNames = stoppingStreams.stream().map(Datastream::getName).collect(Collectors.toSet());
 
       if (!stoppingStreamNames.isEmpty()) {
-        _log.info("Trying to claim assignment tokens for streams: {}", stoppingStreamNames);
-        _adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName());
+        _log.info("Trying to claim assignment tokens for connector {}, streams: {}", connector, stoppingStreamNames);
+
+        Set<String> stoppingDatastreamTasks = removedTasks.stream().
+            filter(t -> stoppingStreamNames.contains(t.getTaskPrefix())).
+            map(DatastreamTask::getId).collect(Collectors.toSet());
+
+        if (PollUtils.poll(() -> connectorTasksHaveStopped(connector, stoppingDatastreamTasks),
+            _config.getTaskStopCheckRetryPeriodMs(), _config.getTaskStopCheckTimeoutMs())) {
+          _adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName());
+        } else {
+          _log.warn("Connector {} failed to stop its tasks in {}ms. No assignment tokens will be claimed",
+              connector, _config.getTaskStopCheckTimeoutMs());
+        }
       } else {
         _log.info("No streams have been inferred as stopping and no assignment tokens will be claimed");
       }
-    }
+    });
+  }
+
+  @VisibleForTesting
+  boolean connectorTasksHaveStopped(String connectorName, Set<String> stoppingTasks) {
+    Set<String> activeTasks =
+        new HashSet<>(_connectors.get(connectorName).getConnector().getConnectorInstance().getActiveTasks());
+    return activeTasks.isEmpty() || stoppingTasks.isEmpty() || Collections.disjoint(activeTasks, stoppingTasks);
   }
 
   @VisibleForTesting
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java
index c0cc3f3d9..da357402d 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java
@@ -32,8 +32,12 @@ public final class CoordinatorConfig {
   public static final String CONFIG_REINIT_ON_NEW_ZK_SESSION = PREFIX + "reinitOnNewZKSession";
   public static final String CONFIG_MAX_ASSIGNMENT_RETRY_COUNT = PREFIX + "maxAssignmentRetryCount";
   public static final String CONFIG_ENABLE_ASSIGNMENT_TOKENS = PREFIX + "enableAssignmentTokens";
+  public static final String CONFIG_TASK_STOP_CHECK_TIMEOUT_MS = PREFIX + "taskStopCheckTimeoutMs";
+  public static final String CONFIG_TASK_STOP_CHECK_RETRY_PERIOD_MS = PREFIX + "taskStopCheckRetryPeriodMs";
 
   public static final int DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT = 100;
+  public static final long DEFAULT_TASK_STOP_CHECK_TIMEOUT_MS = 60 * 1000;
+  public static final long DEFAULT_TASK_STOP_CHECK_RETRY_PERIOD_MS = 10 * 1000;
 
   private final String _cluster;
   private final String _zkAddress;
@@ -52,6 +56,8 @@ public final class CoordinatorConfig {
   private final boolean _reinitOnNewZkSession;
   private final int _maxAssignmentRetryCount;
   private final boolean _enableAssignmentTokens;
+  private final long _taskStopCheckTimeoutMs;
+  private final long _taskStopCheckRetryPeriodMs;
 
   /**
    * Construct an instance of CoordinatorConfig
@@ -78,6 +84,9 @@ public CoordinatorConfig(Properties config) {
     _reinitOnNewZkSession = _properties.getBoolean(CONFIG_REINIT_ON_NEW_ZK_SESSION, false);
     _maxAssignmentRetryCount = _properties.getInt(CONFIG_MAX_ASSIGNMENT_RETRY_COUNT, DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT);
     _enableAssignmentTokens = _properties.getBoolean(CONFIG_ENABLE_ASSIGNMENT_TOKENS, false);
+    _taskStopCheckTimeoutMs = _properties.getLong(CONFIG_TASK_STOP_CHECK_TIMEOUT_MS, DEFAULT_TASK_STOP_CHECK_TIMEOUT_MS);
+    _taskStopCheckRetryPeriodMs = _properties.getLong(CONFIG_TASK_STOP_CHECK_RETRY_PERIOD_MS,
+        DEFAULT_TASK_STOP_CHECK_RETRY_PERIOD_MS);
   }
 
   public Properties getConfigProperties() {
@@ -143,4 +152,12 @@ public int getMaxAssignmentRetryCount() {
   public boolean getEnableAssignmentTokens() {
     return _enableAssignmentTokens;
   }
+
+  public long getTaskStopCheckTimeoutMs() {
+    return _taskStopCheckTimeoutMs;
+  }
+
+  public long getTaskStopCheckRetryPeriodMs() {
+    return _taskStopCheckRetryPeriodMs;
+  }
 }

From 29e5f47b511643cff55d9554995beff4efe970b2 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Wed, 18 Jan 2023 13:48:32 -0800
Subject: [PATCH 08/14] Added missing javadoc

---
 .../datastream/connectors/kafka/AbstractKafkaConnector.java    | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java
index dc58ee427..2f64baee9 100644
--- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java
+++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java
@@ -789,6 +789,9 @@ public Map<String, Map<Integer, Long>> getConsumptionLagForTask(DatastreamTask t
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   public List<String> getActiveTasks() {
     List<DatastreamTask> tasks = new ArrayList<>();
 

From fe0090e62760976b2560e2e81ce1b261c195b027 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Wed, 18 Jan 2023 15:26:42 -0800
Subject: [PATCH 09/14] Minor improvements and more tests

---
 .../com/linkedin/datastream/server/Coordinator.java | 13 ++++++-------
 .../datastream/server/TestCoordinatorConfig.java    |  8 ++++++++
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index 853c7b04b..b7e55cb67 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -778,21 +778,20 @@ private Future<Boolean> dispatchAssignmentChangeIfNeeded(String connectorType, L
   }
 
   private List<DatastreamTask> getRemovedTasks(List<DatastreamTask> newAssignment, List<DatastreamTask> oldAssignment) {
-    List<DatastreamTask> removedTasks = new ArrayList<>(oldAssignment);
-    removedTasks.removeAll(newAssignment);
-    return removedTasks;
+    Set<DatastreamTask> removedTasks = new HashSet<>(oldAssignment);
+    newAssignment.forEach(removedTasks::remove);
+    return new ArrayList<>(removedTasks);
   }
 
   private List<DatastreamTask> getAddedTasks(List<DatastreamTask> newAssignment, List<DatastreamTask> oldAssignment) {
-    List<DatastreamTask> addedTasks = new ArrayList<>(newAssignment);
-    addedTasks.removeAll(oldAssignment);
-    return addedTasks;
+    Set<DatastreamTask> addedTasks = new HashSet<>(newAssignment);
+    oldAssignment.forEach(addedTasks::remove);
+    return new ArrayList<>(addedTasks);
   }
 
   @VisibleForTesting
   void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssignment,
       List<DatastreamTask> oldAssignment) {
-    // TODO Check for performance optimizations
     // TODO Add metrics for number of streams that are inferred as stopping
     Map<String, List<DatastreamTask>> newAssignmentPerConnector = new HashMap<>();
     for (DatastreamTask task : newAssignment) {
diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorConfig.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorConfig.java
index 1b8611d48..1e5760b7c 100644
--- a/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorConfig.java
+++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestCoordinatorConfig.java
@@ -49,4 +49,12 @@ public void testCoordinatorMaxAssignmentRetryCountDefault() throws Exception {
     CoordinatorConfig config = createCoordinatorConfig(props);
     Assert.assertEquals(CoordinatorConfig.DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT, config.getMaxAssignmentRetryCount());
   }
+
+  @Test
+  public void testTaskStopTimeoutAndRetryConfigDefault() {
+    Properties props = new Properties();
+    CoordinatorConfig config = createCoordinatorConfig(props);
+    Assert.assertEquals(CoordinatorConfig.DEFAULT_TASK_STOP_CHECK_RETRY_PERIOD_MS, config.getTaskStopCheckRetryPeriodMs());
+    Assert.assertEquals(CoordinatorConfig.DEFAULT_TASK_STOP_CHECK_TIMEOUT_MS, config.getTaskStopCheckTimeoutMs());
+  }
 }

From 146b8f41b6e7aa6715a62f11b7b0c081ffe8daa3 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Fri, 20 Jan 2023 10:20:08 -0800
Subject: [PATCH 10/14] Added exception handling in zookeeper delete

---
 .../java/com/linkedin/datastream/server/zk/ZkAdapter.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
index f96356621..3606b9321 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java
@@ -819,7 +819,11 @@ public void claimAssignmentTokensForDatastreams(List<Datastream> datastreams, St
         } else {
           LOG.info("Revoking assignment token for datastream: {}, instance: {}", streamName, instance);
         }
-        _zkclient.delete(tokenPath);
+        try {
+          _zkclient.delete(tokenPath);
+        } catch (Exception ex) {
+          LOG.error("Failed to delete token {}", tokenPath, ex);
+        }
       } else {
         LOG.debug("Attempt to claim non-existing assignment token {}", tokenPath);
       }

From c07b05b4eba903e0585b0b3bba38b671ec971e2d Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Mon, 23 Jan 2023 10:53:26 -0800
Subject: [PATCH 11/14] Minor fixes

---
 .../datastream/server/api/connector/Connector.java         | 2 +-
 .../java/com/linkedin/datastream/server/Coordinator.java   | 7 ++++---
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
index c1d1f3a58..63e045f17 100644
--- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
+++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
@@ -72,7 +72,7 @@ public interface Connector extends MetricsAware, DatastreamChangeListener {
    * @throws UnsupportedOperationException if not implemented by Connector classes.
    */
   default List<String> getActiveTasks() {
-    throw new UnsupportedOperationException("Running tasks API is not supported unless implemented by coordinators");
+    throw new UnsupportedOperationException("Running tasks API is not supported unless implemented by connectors");
   }
 
   /**
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index b7e55cb67..885ed9f07 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -716,11 +716,11 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
 
     if (_config.getEnableAssignmentTokens()) {
       try {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
         // Queue assignment token claim task
-        _executor.schedule(() -> maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment),
-            0, TimeUnit.MILLISECONDS);
+        executor.submit(() -> maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment));
       } catch (RejectedExecutionException ex) {
-        _log.warn("Failed to schedule the task for claiming assignment tokens", ex);
+        _log.warn("Failed to submit the task for claiming assignment tokens", ex);
       }
     }
 
@@ -828,6 +828,7 @@ void maybeClaimAssignmentTokensForStoppingStreams(List<DatastreamTask> newAssign
             filter(t -> stoppingStreamNames.contains(t.getTaskPrefix())).
             map(DatastreamTask::getId).collect(Collectors.toSet());
 
+        // TODO Evaluate whether we need to optimize here and make this call for each datastream
         if (PollUtils.poll(() -> connectorTasksHaveStopped(connector, stoppingDatastreamTasks),
             _config.getTaskStopCheckRetryPeriodMs(), _config.getTaskStopCheckTimeoutMs())) {
           _adapter.claimAssignmentTokensForDatastreams(stoppingStreams, _adapter.getInstanceName());

From 7c74ce6d8f69b95002913d546b752ad276dc7f98 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Mon, 23 Jan 2023 11:37:08 -0800
Subject: [PATCH 12/14] Minor fix

---
 .../com/linkedin/datastream/server/api/connector/Connector.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
index 63e045f17..e06831ad7 100644
--- a/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
+++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/api/connector/Connector.java
@@ -72,7 +72,7 @@ public interface Connector extends MetricsAware, DatastreamChangeListener {
    * @throws UnsupportedOperationException if not implemented by Connector classes.
    */
   default List<String> getActiveTasks() {
-    throw new UnsupportedOperationException("Running tasks API is not supported unless implemented by connectors");
+    throw new UnsupportedOperationException("Active tasks API is not supported unless implemented by connectors");
   }
 
   /**

From 5aeb303c6c43806ed8a356e28f596ed09a6af6e1 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Mon, 23 Jan 2023 11:45:01 -0800
Subject: [PATCH 13/14] Improvements to executor service

---
 .../java/com/linkedin/datastream/server/Coordinator.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index 885ed9f07..5d5d3b355 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -45,6 +45,7 @@
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import com.linkedin.datastream.common.Datastream;
 import com.linkedin.datastream.common.DatastreamAlreadyExistsException;
@@ -716,9 +717,11 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
 
     if (_config.getEnableAssignmentTokens()) {
       try {
-        ExecutorService executor = Executors.newSingleThreadExecutor();
         // Queue assignment token claim task
-        executor.submit(() -> maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment));
+        ExecutorService claimTokensExecutorService = Executors.newSingleThreadExecutor(
+            new ThreadFactoryBuilder().setNameFormat("CoordinatorClaimTokens-%d").build());
+        claimTokensExecutorService.submit(() ->
+            maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment));
       } catch (RejectedExecutionException ex) {
         _log.warn("Failed to submit the task for claiming assignment tokens", ex);
       }

From 525824f468710aa593ef612ee232b17b1df820b9 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <jzakaryan@linkedin.com>
Date: Mon, 23 Jan 2023 13:27:45 -0800
Subject: [PATCH 14/14] Improved lifecycle management for Coordinator's
 executor services

---
 .../datastream/server/Coordinator.java        | 34 +++++++++++++------
 1 file changed, 23 insertions(+), 11 deletions(-)

diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
index 5d5d3b355..7ee6dcf49 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java
@@ -200,7 +200,6 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware {
   private final Duration _heartbeatPeriod;
 
   private final Logger _log = LoggerFactory.getLogger(Coordinator.class.getName());
-  private final ScheduledExecutorService _executor = Executors.newSingleThreadScheduledExecutor();
 
   // make sure the scheduled retries are not duplicated
   private final AtomicBoolean _leaderDatastreamAddOrDeleteEventScheduled = new AtomicBoolean(false);
@@ -213,6 +212,8 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware {
   private volatile boolean _shutdown = false;
 
   private CoordinatorEventProcessor _eventThread;
+  private ScheduledExecutorService _scheduledExecutor;
+  private ExecutorService _tokenClaimExecutor;
   private Future<?> _leaderDatastreamAddOrDeleteEventScheduledFuture = null;
   private Future<?> _leaderDoAssignmentScheduledFuture = null;
   private volatile boolean _zkSessionExpired = false;
@@ -238,7 +239,6 @@ public Coordinator(CachedDatastreamReader datastreamCache, CoordinatorConfig con
     _heartbeatPeriod = Duration.ofMillis(config.getHeartbeatPeriodMs());
 
     _adapter = createZkAdapter();
-
     _eventQueue = new CoordinatorEventBlockingQueue();
     createEventThread();
 
@@ -263,6 +263,12 @@ public void start() {
     startEventThread();
     _adapter.connect(_config.getReinitOnNewZkSession());
 
+    // Initializing executor services
+    _tokenClaimExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat("CoordinatorTokenClaimExecutor-%d").build());
+    _scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setNameFormat("CoordinatorScheduledExecutor-%d").build());
+
     for (String connectorType : _connectors.keySet()) {
       ConnectorInfo connectorInfo = _connectors.get(connectorType);
       ConnectorWrapper connector = connectorInfo.getConnector();
@@ -288,7 +294,7 @@ public void start() {
     queueHandleAssignmentOrDatastreamChangeEvent(CoordinatorEvent.createHandleAssignmentChangeEvent(), true);
 
     // Queue up one heartbeat per period with a initial delay of 3 periods
-    _executor.scheduleAtFixedRate(() -> _eventQueue.put(CoordinatorEvent.HEARTBEAT_EVENT),
+    _scheduledExecutor.scheduleAtFixedRate(() -> _eventQueue.put(CoordinatorEvent.HEARTBEAT_EVENT),
         _heartbeatPeriod.toMillis() * 3, _heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS);
   }
 
@@ -360,6 +366,14 @@ public void stop() {
       }
     }
 
+    // Shutdown executor services
+    if (_scheduledExecutor != null) {
+      _scheduledExecutor.shutdown();
+    }
+    if (_tokenClaimExecutor != null) {
+      _tokenClaimExecutor.shutdown();
+    }
+
     // Shutdown the event producer.
     for (DatastreamTask task : _assignedDatastreamTasks.values()) {
       ((EventProducer) task.getEventProducer()).shutdown(false);
@@ -552,7 +566,7 @@ public void onNewSession() {
     _eventQueue.put(CoordinatorEvent.createHandleAssignmentChangeEvent());
 
     // Queue up one heartbeat per period with a initial delay of 3 periods
-    _executor.scheduleAtFixedRate(() -> _eventQueue.put(CoordinatorEvent.HEARTBEAT_EVENT),
+    _scheduledExecutor.scheduleAtFixedRate(() -> _eventQueue.put(CoordinatorEvent.HEARTBEAT_EVENT),
         _heartbeatPeriod.toMillis() * 3, _heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS);
 
     _zkSessionExpired = false;
@@ -718,9 +732,7 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
     if (_config.getEnableAssignmentTokens()) {
       try {
         // Queue assignment token claim task
-        ExecutorService claimTokensExecutorService = Executors.newSingleThreadExecutor(
-            new ThreadFactoryBuilder().setNameFormat("CoordinatorClaimTokens-%d").build());
-        claimTokensExecutorService.submit(() ->
+        _tokenClaimExecutor.submit(() ->
             maybeClaimAssignmentTokensForStoppingStreams(newAssignment, oldAssignment));
       } catch (RejectedExecutionException ex) {
         _log.warn("Failed to submit the task for claiming assignment tokens", ex);
@@ -1151,7 +1163,7 @@ private void handleDatastreamAddOrDelete() {
       // there is no pending retry scheduled already.
       if (_leaderDatastreamAddOrDeleteEventScheduled.compareAndSet(false, true)) {
         _log.warn("Schedule retry for handling new datastream");
-        _leaderDatastreamAddOrDeleteEventScheduledFuture = _executor.schedule(() -> {
+        _leaderDatastreamAddOrDeleteEventScheduledFuture = _scheduledExecutor.schedule(() -> {
           _eventQueue.put(CoordinatorEvent.createHandleDatastreamAddOrDeleteEvent());
 
           // Allow further retry scheduling
@@ -1359,7 +1371,7 @@ private void handleLeaderDoAssignment(boolean cleanUpOrphanNodes) {
       _log.info("Schedule retry for leader assigning tasks");
       _metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.HANDLE_LEADER_DO_ASSIGNMENT_NUM_RETRIES, 1);
       _leaderDoAssignmentScheduled.set(true);
-      _leaderDoAssignmentScheduledFuture = _executor.schedule(() -> {
+      _leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> {
         _eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(cleanUpOrphanNodes));
         _leaderDoAssignmentScheduled.set(false);
       }, _config.getRetryIntervalMs(), TimeUnit.MILLISECONDS);
@@ -1459,7 +1471,7 @@ private void performPartitionAssignment(String datastreamGroupName) {
       _metrics.updateMeter(CoordinatorMetrics.Meter.NUM_PARTITION_ASSIGNMENTS, 1);
     } else {
       _metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.HANDLE_LEADER_PARTITION_ASSIGNMENT_NUM_RETRIES, 1);
-      _executor.schedule(() -> {
+      _scheduledExecutor.schedule(() -> {
         _log.warn("Retry scheduled for leader partition assignment, dg {}", datastreamGroupName);
         // We need to schedule both LEADER_DO_ASSIGNMENT and leader partition assignment in case the tasks are
         // not locked because the assigned instance is dead. As we use a sticky assignment, the leader do assignment
@@ -1582,7 +1594,7 @@ private void performPartitionMovement(Long notifyTimestamp) {
     }  else {
       _log.info("Schedule retry for leader movement tasks");
       _metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.HANDLE_LEADER_PARTITION_MOVEMENT_NUM_RETRIES, 1);
-      _executor.schedule(() ->
+      _scheduledExecutor.schedule(() ->
           _eventQueue.put(CoordinatorEvent.createPartitionMovementEvent(notifyTimestamp)), _config.getRetryIntervalMs(),
           TimeUnit.MILLISECONDS);
     }