Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
"collect_default": True},
}

def __init__(self, context, num_nodes, kafka, files):
def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
super(ConnectServiceBase, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
self.files = files
self.startup_mode = self.STARTUP_MODE_LISTEN
self.startup_timeout_sec = startup_timeout_sec
self.environment = {}
self.external_config_template_func = None

Expand Down Expand Up @@ -122,13 +123,13 @@ def start_and_return_immediately(self, node, worker_type, remote_connector_confi
def start_and_wait_to_load_plugins(self, node, worker_type, remote_connector_configs):
with node.account.monitor_log(self.LOG_FILE) as monitor:
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
monitor.wait_until('Kafka version', timeout_sec=60,
monitor.wait_until('Kafka version', timeout_sec=self.startup_timeout_sec,
err_msg="Never saw message indicating Kafka Connect finished startup on node: " +
"%s in condition mode: %s" % (str(node.account), self.startup_mode))

def start_and_wait_to_start_listening(self, node, worker_type, remote_connector_configs):
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
wait_until(lambda: self.listening(node), timeout_sec=60,
wait_until(lambda: self.listening(node), timeout_sec=self.startup_timeout_sec,
err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
(str(node.account), self.startup_mode))

Expand All @@ -141,7 +142,8 @@ def stop_node(self, node, clean_shutdown=True):
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
for pid in pids:
wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit")
wait_until(lambda: not node.account.alive(pid), timeout_sec=self.startup_timeout_sec, err_msg="Kafka Connect process on " + str(
node.account) + " took too long to exit")

node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)

Expand Down Expand Up @@ -254,8 +256,8 @@ def _base_url(self, node):
class ConnectStandaloneService(ConnectServiceBase):
"""Runs Kafka Connect in standalone mode."""

def __init__(self, context, kafka, files):
super(ConnectStandaloneService, self).__init__(context, 1, kafka, files)
def __init__(self, context, kafka, files, startup_timeout_sec = 60):
super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)

# For convenience since this service only makes sense with a single node
@property
Expand Down Expand Up @@ -303,8 +305,8 @@ class ConnectDistributedService(ConnectServiceBase):
"""Runs Kafka Connect in distributed mode."""

def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
configs_topic="connect-configs", status_topic="connect-status"):
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
self.status_topic = status_topic
Expand Down