From f88ab252f590342560c89141b37534330f129189 Mon Sep 17 00:00:00 2001
From: Vaibhav Maheshwari <vmaheshw@vmaheshw-mn2.linkedin.biz>
Date: Thu, 25 May 2023 10:18:48 -0700
Subject: [PATCH 1/2] Fix tests and reduce the overall test execution time

---
 .../connectors/file/FileProcessor.java        |  7 +-
 .../AbstractKafkaBasedConnectorTask.java      | 15 +++-
 .../KafkaMirrorMakerConnectorTestUtils.java   |  4 +-
 .../TestKafkaMirrorMakerConnectorTask.java    |  5 +-
 .../kafka/EmbeddedKafkaCluster.java           |  1 +
 .../datastream/kafka/KafkaTestUtils.java      |  4 +-
 .../server/dms/DatastreamResources.java       | 11 +--
 .../datastream/server/TestCoordinator.java    | 70 +++++++++++--------
 .../server/dms/TestDatastreamResources.java   |  8 ++-
 .../datastream/testutil/BaseKafkaZkTest.java  |  1 +
 10 files changed, 83 insertions(+), 43 deletions(-)

diff --git a/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java b/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java
index 54f7d3527..3b1fd07be 100644
--- a/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java
+++ b/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java
@@ -33,7 +33,7 @@
 
 
 class FileProcessor implements Runnable {
-  private static final Logger LOG = LoggerFactory.getLogger(FileConnector.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FileProcessor.class);
 
   private static final int PARTITION = 0;
   private static final int POLL_WAIT_MS = 100;
@@ -99,6 +99,7 @@ public void run() {
       _lineNo.set(loadCheckpoint());
       _positionValue.setLinesRead((long) _lineNo.intValue());
       while (!_cancelRequested) {
+        LOG.info("sending event5 ");
         String text;
         try {
           text = _fileReader.readLine();
@@ -127,6 +128,7 @@ public void run() {
             builder.setPartitionKey(_lineNo.toString());
           }
 
+          LOG.info("sending event2 " + text);
           builder.setSourceCheckpoint(_lineNo.toString());
           _producer.send(builder.build(), (metadata, exception) -> {
             if (exception == null) {
@@ -135,10 +137,13 @@ public void run() {
               LOG.error(String.format("Sending event:{%s} failed, metadata:{%s}", text, metadata), exception);
             }
           });
+          LOG.info("sending event3 " + text);
           _positionValue.setLinesRead((long) _lineNo.incrementAndGet());
+          LOG.info("sending event4 " + text);
         } else {
           try {
             // Wait for new data
+            LOG.info("VM sleeping");
             Thread.sleep(POLL_WAIT_MS);
           } catch (InterruptedException e) {
             LOG.info("Interrupted");
diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
index 98918c33e..901c0db82 100644
--- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
+++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
@@ -136,6 +136,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu
   protected final GroupIdConstructor _groupIdConstructor;
 
   protected final KafkaTopicPartitionTracker _kafkaTopicPartitionTracker;
+  private long _commitRetryTimeoutMillis;
 
   protected AbstractKafkaBasedConnectorTask(KafkaBasedConnectorConfig config, DatastreamTask task, Logger logger,
       String metricsPrefix, GroupIdConstructor groupIdConstructor) {
@@ -200,6 +201,7 @@ protected AbstractKafkaBasedConnectorTask(KafkaBasedConnectorConfig config, Data
     _pollTimeoutMillis = config.getPollTimeoutMillis();
     _retrySleepDuration = config.getRetrySleepDuration();
     _commitTimeout = config.getCommitTimeout();
+    _commitRetryTimeoutMillis = COMMIT_RETRY_TIMEOUT_MILLIS;
     _consumerMetrics = createKafkaBasedConnectorTaskMetrics(metricsPrefix, _datastreamName, _logger,
         _enableAdditionalMetrics);
 
@@ -686,7 +688,7 @@ protected void commitWithRetries(Consumer<?, ?> consumer, Optional<Map<TopicPart
       }
 
       return true;
-    }, COMMIT_RETRY_INTERVAL_MILLIS, COMMIT_RETRY_TIMEOUT_MILLIS);
+    }, COMMIT_RETRY_INTERVAL_MILLIS, getCommitRetryTimeoutMillis());
 
     if (!result) {
       String msg = "Commit failed after several retries, Giving up.";
@@ -695,6 +697,15 @@ protected void commitWithRetries(Consumer<?, ?> consumer, Optional<Map<TopicPart
     }
   }
 
+  @VisibleForTesting
+  public void setCommitRetryTimeoutMillis(long retryTimeoutMillis) {
+    _commitRetryTimeoutMillis = retryTimeoutMillis;
+  }
+
+  private long getCommitRetryTimeoutMillis() {
+    return _commitRetryTimeoutMillis;
+  }
+
   private void commitSync(Consumer<?, ?> consumer, Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
     if (offsets.isPresent()) {
       consumer.commitSync(offsets.get(), _commitTimeout);
@@ -714,7 +725,7 @@ protected void seekToLastCheckpoint(Set<TopicPartition> topicPartitions) {
     Set<TopicPartition> tpWithNoCommits = new HashSet<>();
     // construct last checkpoint
     topicPartitions.forEach(tp -> getLastCheckpointToSeekTo(lastCheckpoint, tpWithNoCommits, tp));
-    _logger.info("Seeking to previous checkpoints {}", lastCheckpoint);
+    _logger.info("Seeking to previous checkpoints {} ", lastCheckpoint);
     // reset consumer to last checkpoint, by default we will rewind the checkpoint
     lastCheckpoint.forEach((tp, offsetAndMetadata) -> _consumer.seek(tp, offsetAndMetadata.offset()));
     if (!tpWithNoCommits.isEmpty()) {
diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTestUtils.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTestUtils.java
index 37416ee99..eda7b3a33 100644
--- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTestUtils.java
+++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTestUtils.java
@@ -117,8 +117,10 @@ static KafkaMirrorMakerConnectorTask createKafkaMirrorMakerConnectorTask(Datastr
 
   static KafkaMirrorMakerConnectorTask createKafkaMirrorMakerConnectorTask(DatastreamTaskImpl task,
       KafkaBasedConnectorConfig connectorConfig, String connectorName) {
-    return new KafkaMirrorMakerConnectorTask(connectorConfig, task, connectorName, false,
+    KafkaMirrorMakerConnectorTask kafkaMirrorMakerConnectorTask = new KafkaMirrorMakerConnectorTask(connectorConfig, task, connectorName, false,
         new KafkaMirrorMakerGroupIdConstructor(false, "testCluster"));
+    kafkaMirrorMakerConnectorTask.setCommitRetryTimeoutMillis(1000);
+    return kafkaMirrorMakerConnectorTask;
   }
 
   static KafkaMirrorMakerConnectorTask createFlushlessKafkaMirrorMakerConnectorTask(DatastreamTaskImpl task,
diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnectorTask.java
index 9fd2670a0..a93df342c 100644
--- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnectorTask.java
+++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnectorTask.java
@@ -91,6 +91,7 @@
 /**
  * Tests for {@link KafkaMirrorMakerConnectorTask}
  */
+@Test(singleThreaded = false)
 public class TestKafkaMirrorMakerConnectorTask extends BaseKafkaZkTest {
 
   private static final long CONNECTOR_AWAIT_STOP_TIMEOUT_MS = 30000;
@@ -553,7 +554,7 @@ public void testPartitionManagedLockReleaseOnInterruptException() throws Interru
 
     KafkaBasedConnectorConfig connectorConfig = new KafkaBasedConnectorConfigBuilder()
         .setConsumerFactory(new LiKafkaConsumerFactory())
-        .setCommitIntervalMillis(10000)
+        .setCommitIntervalMillis(100)
         .setEnablePartitionManaged(true)
         .build();
 
@@ -577,7 +578,7 @@ public void testPartitionManagedLockReleaseOnThreadInterrupt() throws Interrupte
 
     KafkaBasedConnectorConfig connectorConfig = new KafkaBasedConnectorConfigBuilder()
         .setConsumerFactory(new LiKafkaConsumerFactory())
-        .setCommitIntervalMillis(10000)
+        .setCommitIntervalMillis(100)
         .setEnablePartitionManaged(true)
         .build();
 
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/EmbeddedKafkaCluster.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/EmbeddedKafkaCluster.java
index 832d449ca..c37201a1b 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/EmbeddedKafkaCluster.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/EmbeddedKafkaCluster.java
@@ -114,6 +114,7 @@ public void startup() {
       properties.setProperty("log.flush.interval.messages", String.valueOf(1));
       properties.setProperty("log.cleaner.enable", Boolean.FALSE.toString()); //to save memory
       properties.setProperty("offsets.topic.num.partitions", "1");
+      properties.setProperty("offsets.topic.replication.factor", "1");
 
       KafkaServerStartable broker = startBroker(properties);
 
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTestUtils.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTestUtils.java
index 17ac1919f..43ef3479b 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTestUtils.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTestUtils.java
@@ -104,7 +104,7 @@ public static void waitForTopicCreation(AdminClient adminClient, String topic, S
       consumer.subscribe(Collections.singleton(topic));
       while (Instant.now().isBefore(expiration)) {
         try {
-          consumer.poll(Duration.ofSeconds(1).toMillis());
+          consumer.poll(Duration.ofSeconds(1));
           return;
         } catch (Exception ignored) {
           // Exception should occur when we are waiting for the broker to be assigned this topic
@@ -189,7 +189,7 @@ public static void readTopic(String topic, Integer partition, String brokerList,
     boolean keepGoing = true;
     long now = System.currentTimeMillis();
     do {
-      ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
+      ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
       for (ConsumerRecord<byte[], byte[]> record : records.records(topic)) {
         if (!callback.onMessage(record.key(), record.value())) {
           keepGoing = false;
diff --git a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java
index a56b1880f..530e690aa 100644
--- a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java
+++ b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/dms/DatastreamResources.java
@@ -516,12 +516,15 @@ public ActionResult<Void> stop(@PathKeysParam PathKeys pathKeys,
       }
     }
 
+    long t1 = System.currentTimeMillis();
     // polls until the leader transitions the state of the datastream to STOPPED state
-    PollUtils.poll(() -> datastreamsToStop.stream()
-            .allMatch(ds -> _store.getDatastream(ds.getName()).getStatus().equals(DatastreamStatus.STOPPED)),
-        allStopped -> allStopped, _stopTransitionRetryPeriodMs.toMillis(), _stopTransitionTimeoutMs.toMillis())
+    PollUtils.poll(() ->
+                datastreamsToStop.stream()
+                    .allMatch(ds ->
+                        _store.getDatastream(ds.getName()).getStatus().equals(DatastreamStatus.STOPPED)),
+            allStopped -> allStopped, _stopTransitionRetryPeriodMs.toMillis(), _stopTransitionTimeoutMs.toMillis())
         .orElseThrow(() -> new RestLiServiceException(HttpStatus.S_408_REQUEST_TIMEOUT,
-            String.format("Stop request timed out for datastream: %s", datastreamName)));
+            String.format("Stop request timed out for datastream: %s %d", datastreamName, System.currentTimeMillis() - t1)));
 
     LOG.info("Completed request for stopping datastream {}", _store.getDatastream(datastream.getName()));
 
diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
index 73f9ccee5..07b845c34 100644
--- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
+++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java
@@ -121,7 +121,7 @@
 public class TestCoordinator {
   private static final Logger LOG = LoggerFactory.getLogger(TestCoordinator.class);
   private static final long WAIT_DURATION_FOR_ZK = Duration.ofMinutes(1).toMillis();
-  private static final int WAIT_TIMEOUT_MS = 60000;
+  private static final int WAIT_TIMEOUT_MS = 2000;
 
   EmbeddedZookeeper _embeddedZookeeper;
   String _zkConnectionString;
@@ -523,7 +523,7 @@ public void testCoordinationWithBroadcastStrategy() throws Exception {
 
   @Test
   public void testHandleAssignmentChangeTransientFailure() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeTransientFailure";
     String testConnectorType = "testConnectorType";
     String datastreamName1 = "datastream1";
 
@@ -550,7 +550,7 @@ public void testHandleAssignmentChangeTransientFailure() throws Exception {
 
   @Test
   public void testHandleAssignmentChangeFailure() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeChangeFailure";
     String testConnectorType = "testConnectorType";
     String datastreamName1 = "datastream1";
 
@@ -575,7 +575,7 @@ public void testHandleAssignmentChangeFailure() throws Exception {
 
   @Test
   public void testStopAndResumeDatastream() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeResumeDatastream";
     String testConnectorType = "testConnectorType";
     String datastreamName1 = "datastream1";
 
@@ -627,7 +627,7 @@ public void testStopAndResumeDatastream() throws Exception {
      */
   @Test
   public void testCoordinationWithStickyMulticastStrategy() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeWithSticky";
     String testConnectorType = "testConnectorType";
     Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);
 
@@ -708,8 +708,8 @@ public void testCoordinationWithStickyMulticastStrategy() throws Exception {
     instance4.start();
 
     connectors.add(connector4);
-    // verify connector4 get at least 5 task assignment
-    waitTillAssignmentIsComplete(5, WAIT_TIMEOUT_MS, connector4);
+    // verify connector4 get at least 4 task assignment
+    waitTillAssignmentIsComplete(4, WAIT_TIMEOUT_MS, connector4);
 
     instance2.stop();
     instance3.stop();
@@ -845,7 +845,7 @@ public void testCoordinationWithStickyMulticastStrategyAndMaxTaskLimit() throws
 
   @Test
   public void testCoordinationWithPartitionAssignment() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeWithPartitionAssignment";
     String testConnectorType = "testConnectorType";
     Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);
     ZkClient zkClient = new ZkClient(_zkConnectionString);
@@ -1051,7 +1051,7 @@ public void testCoordinationWithElasticTaskAssignmentPartitionAssignment() throw
           // Verify all the partitions are assigned
           Map<String, List<String>> assignment2 = collectDatastreamPartitions(connectors);
           return assignment2.get("datastream42").size() == partitions1.size() && assignment2.get("datastream43").size() == partitions2.size();
-        }, interval, WAIT_TIMEOUT_MS));
+        }, interval, WAIT_TIMEOUT_MS * 5));
 
     instance1.stop();
     instance2.stop();
@@ -1077,7 +1077,7 @@ private StickyPartitionAssignmentStrategy createStrategy(String testCluster, ZkC
    */
   @Test
   public void testBYOTDatastreamWithUsedDestination() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeWithUsedDestination";
     String testConnectorType = "testConnectorType";
 
     Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster);
@@ -1229,7 +1229,7 @@ public void testInvokePostDataStreamStateChangeAction() throws Exception {
    */
   @Test
   public void testDatastreamWithConnectorManagedDestination() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeWithConnectorManagedDestination";
     String testConnectorType = "testConnectorType";
 
     DummyTransportProviderAdminFactory transportProviderAdminFactory = new DummyTransportProviderAdminFactory();
@@ -1272,7 +1272,7 @@ public void testDatastreamWithConnectorManagedDestination() throws Exception {
    */
   @Test
   public void testDatastreamWithoutConnectorManagedDestination() throws Exception {
-    String testCluster = "testCoordinationSmoke";
+    String testCluster = "testCoordinationSmokeWithoutConnectorManagedDestination";
     String testConnectorType = "testConnectorType";
 
     DummyTransportProviderAdminFactory transportProviderAdminFactory = new DummyTransportProviderAdminFactory();
@@ -1297,6 +1297,11 @@ public void testDatastreamWithoutConnectorManagedDestination() throws Exception
     Assert.assertEquals(transportProviderAdminFactory._createDestinationCount, 1,
         "Create destination count should have been 1, since Datastream does not have connector-managed destination");
 
+    // wait for datastream to be READY
+    Assert.assertTrue(PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, datastreamName)
+        .getStatus()
+        .equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS));
+
     resource.delete(datastreamName);
     String path = KeyBuilder.datastream(testCluster, datastreamName);
     Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS));
@@ -1444,9 +1449,9 @@ public void testCoordinatorHandleUpdateDatastream() throws Exception {
     LOG.info("Created datastream: {}", datastream);
 
     // wait for datastream to be READY
-    PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, "datastream1")
+    Assert.assertTrue(PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, "datastream1")
         .getStatus()
-        .equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS);
+        .equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS));
     datastream = DatastreamTestUtils.getDatastream(zkClient, testCluster, datastream.getName());
     assertConnectorAssignment(connector1, WAIT_TIMEOUT_MS, datastream.getName());
     assertConnectorAssignment(connector2, WAIT_TIMEOUT_MS, datastream.getName());
@@ -2559,12 +2564,6 @@ public void testDatastreamDeleteUponTTLExpire() throws Exception {
     Datastream[] streams = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, streamNames);
     streams[0].getSource().setConnectionString(DummyConnector.VALID_DUMMY_SOURCE);
     streams[1].getSource().setConnectionString(DummyConnector.VALID_DUMMY_SOURCE);
-    streams[0].getDestination()
-        .setConnectionString(new KafkaDestination(setup._datastreamKafkaCluster.getKafkaCluster().getZkConnection(),
-            "TestDatastreamTopic1", false).getDestinationURI());
-    streams[1].getDestination()
-        .setConnectionString(new KafkaDestination(setup._datastreamKafkaCluster.getKafkaCluster().getZkConnection(),
-            "TestDatastreamTopic2", false).getDestinationURI());
 
     // stream1 expires after 500ms and should get deleted when stream2 is created
     long threeDaysAgo = Instant.now().minus(Duration.ofDays(3)).toEpochMilli();
@@ -2610,13 +2609,17 @@ public void testMultipleDatastreamDeleteUponTTLExpire() throws Exception {
 
     // stream1 and stream2 expire in 1 minute from now and should get deleted when stream3 is created
     long createTime = Instant.now().toEpochMilli();
-    long expireTTL = Duration.ofMinutes(1).toMillis();
+    long expireTTL = Duration.ofSeconds(5).toMillis();
 
     streams[0].getMetadata().put(CREATION_MS, String.valueOf(createTime));
     streams[0].getMetadata().put(TTL_MS, String.valueOf(expireTTL));
     streams[1].getMetadata().put(CREATION_MS, String.valueOf(createTime));
     streams[1].getMetadata().put(TTL_MS, String.valueOf(expireTTL));
 
+    streams[0].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
+    streams[1].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
+    streams[2].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
+
     // Creation should go through as TTL is not considered for freshly created streams (INITIALIZING)
     CreateResponse createResponse = setup._resource.create(streams[0]);
     Assert.assertNull(createResponse.getError());
@@ -2625,35 +2628,39 @@ public void testMultipleDatastreamDeleteUponTTLExpire() throws Exception {
     Assert.assertNull(createResponse.getError());
     Assert.assertEquals(createResponse.getStatus(), HttpStatus.S_201_CREATED);
 
-    // Sleep for 1 minute to wait for stream1 and stream2 to expire.
-    Thread.sleep(Duration.ofMinutes(1).toMillis());
+    // Sleep for 5 sec to wait for stream1 and stream2 to expire.
+    Thread.sleep(Duration.ofSeconds(5).toMillis());
 
     // Creating a stream3 which should trigger stream1 to be deleted
     createResponse = setup._resource.create(streams[2]);
     Assert.assertNull(createResponse.getError());
     Assert.assertEquals(createResponse.getStatus(), HttpStatus.S_201_CREATED);
 
-    // Poll up to 30s for stream1 to get deleted
-    PollUtils.poll(() -> {
+    // Poll up to 10s for stream1 to get deleted
+    boolean status = PollUtils.poll(() -> {
       try {
         setup._resource.get(streams[0].getName());
+        setup._coordinator.getDatastreamCache().getZkclient().writeData(KeyBuilder.datastreams(setup._coordinator.getClusterName()), "1234");
         return false;
       } catch (RestLiServiceException e) {
         Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND);
         return true;
       }
-    }, 200, Duration.ofSeconds(30).toMillis());
+    }, 200, Duration.ofSeconds(10).toMillis());
+    Assert.assertTrue(status);
 
-    // Poll up to 30s for stream2 to get deleted
-    PollUtils.poll(() -> {
+    // Poll up to 10s for stream2 to get deleted
+    status =  PollUtils.poll(() -> {
       try {
         setup._resource.get(streams[1].getName());
+        setup._coordinator.getDatastreamCache().getZkclient().writeData(KeyBuilder.datastreams(setup._coordinator.getClusterName()), "5678");
         return false;
       } catch (RestLiServiceException e) {
         Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND);
         return true;
       }
-    }, 200, Duration.ofSeconds(30).toMillis());
+    }, 200, Duration.ofSeconds(10).toMillis());
+    Assert.assertTrue(status);
   }
 
   @Test
@@ -2671,6 +2678,9 @@ public void testDoNotAssignExpiredStreams() throws Exception {
         .setConnectionString(new KafkaDestination(setup._datastreamKafkaCluster.getKafkaCluster().getZkConnection(),
             "TestDatastreamTopic2", false).getDestinationURI());
 
+    streams[0].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
+    streams[1].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
+
     // stream2 expires after 500ms and should not get assigned
     long threeDaysAgo = Instant.now().minus(Duration.ofDays(3)).toEpochMilli();
     streams[1].getMetadata().put(CREATION_MS, String.valueOf(threeDaysAgo));
@@ -3156,7 +3166,7 @@ public void testOnSessionExpiredHandleNewSession() throws Exception {
   }
 
   void testOnSessionExpired(boolean handleNewSession) throws DatastreamException, InterruptedException {
-    String testCluster = "testCoordinationSmoke3";
+    String testCluster = "testOnSessionExpired";
     String testConnectorType = "testConnectorType";
     String datastreamName = "datastreamNameSessionExpired";
 
diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java
index 25b348eb0..3603547b8 100644
--- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java
+++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java
@@ -518,7 +518,13 @@ public void testStopRequestTimeoutWithBusyLeader() throws DatastreamException {
     Mockito.doReturn(mockDatastreamStore).when(mockDatastreamServer).getDatastreamStore();
     Mockito.doReturn(mockDatastreamServer).when(mockDatastreamCluster).getPrimaryDatastreamServer();
 
-    DatastreamResources resource1 = new DatastreamResources(mockDatastreamCluster.getPrimaryDatastreamServer());
+    // Configuring small timeouts to mock timeout scenario
+    Properties testProperties = new Properties();
+    testProperties.setProperty(CONFIG_STOP_TRANSITION_TIMEOUT_MS, "1000");
+    testProperties.setProperty(CONFIG_STOP_TRANSITION_RETRY_PERIOD_MS, "5");
+
+    DatastreamResources resource1 = new DatastreamResources(mockDatastreamCluster.getPrimaryDatastreamServer().getDatastreamStore(),
+        mockDatastreamCluster.getPrimaryDatastreamServer().getCoordinator(), testProperties);
 
     // Create a Datastream.
     Datastream datastreamToCreate = generateDatastream(0);
diff --git a/datastream-testcommon/src/main/java/com/linkedin/datastream/testutil/BaseKafkaZkTest.java b/datastream-testcommon/src/main/java/com/linkedin/datastream/testutil/BaseKafkaZkTest.java
index 4e9211daa..b3c5854ae 100644
--- a/datastream-testcommon/src/main/java/com/linkedin/datastream/testutil/BaseKafkaZkTest.java
+++ b/datastream-testcommon/src/main/java/com/linkedin/datastream/testutil/BaseKafkaZkTest.java
@@ -48,6 +48,7 @@ public void beforeMethodSetup() throws Exception {
     kafkaConfig.setProperty("auto.create.topics.enable", Boolean.FALSE.toString());
     kafkaConfig.setProperty("delete.topic.enable", Boolean.TRUE.toString());
     kafkaConfig.setProperty("offsets.topic.replication.factor", "1");
+    kafkaConfig.setProperty("group.initial.rebalance.delay.ms", "100");
     _kafkaCluster = new DatastreamEmbeddedZookeeperKafkaCluster(kafkaConfig);
     _kafkaCluster.startup();
     _broker = _kafkaCluster.getBrokers().split("\\s*,\\s*")[0];

From febd1a87fd3dc94cdfc6e026b2b9fa49973548ad Mon Sep 17 00:00:00 2001
From: Vaibhav Maheshwari <vmaheshw@vmaheshw-mn2.linkedin.biz>
Date: Thu, 25 May 2023 10:23:23 -0700
Subject: [PATCH 2/2] Fix tests

---
 .../linkedin/datastream/connectors/file/FileProcessor.java   | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java b/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java
index 3b1fd07be..fa4bb3e1e 100644
--- a/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java
+++ b/datastream-file-connector/src/main/java/com/linkedin/datastream/connectors/file/FileProcessor.java
@@ -99,7 +99,6 @@ public void run() {
       _lineNo.set(loadCheckpoint());
       _positionValue.setLinesRead((long) _lineNo.intValue());
       while (!_cancelRequested) {
-        LOG.info("sending event5 ");
         String text;
         try {
           text = _fileReader.readLine();
@@ -128,7 +127,6 @@ public void run() {
             builder.setPartitionKey(_lineNo.toString());
           }
 
-          LOG.info("sending event2 " + text);
           builder.setSourceCheckpoint(_lineNo.toString());
           _producer.send(builder.build(), (metadata, exception) -> {
             if (exception == null) {
@@ -137,13 +135,10 @@ public void run() {
               LOG.error(String.format("Sending event:{%s} failed, metadata:{%s}", text, metadata), exception);
             }
           });
-          LOG.info("sending event3 " + text);
           _positionValue.setLinesRead((long) _lineNo.incrementAndGet());
-          LOG.info("sending event4 " + text);
         } else {
           try {
             // Wait for new data
-            LOG.info("VM sleeping");
             Thread.sleep(POLL_WAIT_MS);
           } catch (InterruptedException e) {
             LOG.info("Interrupted");