Skip to content

Commit

Permalink
Customizing the heartbeat period for tests to 1 seconds if not explic…
Browse files Browse the repository at this point in the history
…ity provided
  • Loading branch information
Shrinand Thakkar committed Oct 27, 2023
1 parent d344473 commit a76ee6c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.linkedin.datastream.connectors.kafka;

import java.time.Duration;
import java.util.Properties;

import com.linkedin.datastream.common.zk.ZkClient;
Expand Down Expand Up @@ -41,6 +42,9 @@ public static Coordinator createCoordinator(String zkAddr, String cluster, Prope
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
CachedDatastreamReader cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand All @@ -178,6 +181,9 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand Down Expand Up @@ -2986,6 +2992,7 @@ public void testCoordinatorLeaderCleanupTasksPostElection() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3098,6 +3105,7 @@ public void testNewlyElectedLeaderRevokesAssignmentTokens() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(1000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3185,6 +3193,7 @@ void testOnSessionExpired(boolean handleNewSession) throws DatastreamException,
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_REINIT_ON_NEW_ZK_SESSION, String.valueOf(handleNewSession));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3275,6 +3284,7 @@ public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3341,6 +3351,7 @@ public void testTokensNotClaimedForConnectorThatFailedToStop() throws Exception
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_TIMEOUT_MS, "100");
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_RETRY_PERIOD_MS, "10");
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3419,6 +3430,7 @@ public void testLeaderDoesNotPollForTokensIfFeatureIsDisabled() throws Exception
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(false));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3452,6 +3464,7 @@ public void testLeaderPollsForTokensAndRevokesThemIfTheyAreUnclaimed() throws Ex
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(6000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3503,6 +3516,7 @@ public void testLeaderPollsForTokensAndMarksTheDatastreamStoppedIfTheyAreClaimed
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(6000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down

0 comments on commit a76ee6c

Please sign in to comment.