Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] tests/availability: fix resuming of suspended nodes #22694

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions tests/rptest/services/failure_injector.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,11 @@ def _heal_all(self):
if spec.type != FailureSpec.FAILURE_ISOLATE
}

def _contunue_all(self):
def _continue_all(self):
self.redpanda.logger.info(f"continuing execution on all nodes")
for n in self.redpanda.nodes:
self._continue(n)
if self.redpanda.check_node(n):
self._continue(n)
self._in_flight = {
spec
for spec in self._in_flight
Expand Down
32 changes: 14 additions & 18 deletions tests/rptest/tests/availability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ def test_availability_when_one_node_failed(self):
self.start_producer(1, throughput=10000)
self.start_consumer(1)
self.await_startup()
# start failure injector with default parameters
self.start_finjector()

self.validate_records()
# run failure injector loop with default parameters
with self.finj_thread():
self.validate_records()

@cluster(num_nodes=5, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_recovery_after_catastrophic_failure(self):
Expand Down Expand Up @@ -90,17 +89,14 @@ def test_recovery_after_catastrophic_failure(self):
self.start_consumer(1)
self.await_startup()

# inject permanent random failure
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
random.choice(self.redpanda.nodes[0:1]))

self.inject_failure(f_spec)

# inject transient failure on other node
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
self.redpanda.nodes[2],
length=2.0 if self.scale.local else 15.0)

self.inject_failure(f_spec)

self.validate_records()
with self.finj_manual() as finj:
# inject permanent random failure
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
random.choice(self.redpanda.nodes[0:1]))
finj(f_spec)
# inject transient failure on other node
f_spec = FailureSpec(random.choice(FailureSpec.FAILURE_TYPES),
self.redpanda.nodes[2],
length=2.0 if self.scale.local else 15.0)
finj(f_spec)
self.validate_records()
61 changes: 49 additions & 12 deletions tests/rptest/tests/e2e_finjector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from contextlib import contextmanager
import random
import time
import threading
Expand All @@ -33,7 +34,8 @@ def const_delay(delay_seconds=10):
class EndToEndFinjectorTest(EndToEndTest):
def __init__(self, test_context):
super(EndToEndFinjectorTest, self).__init__(test_context=test_context)
self.enable_failures = True
self.enable_manual = False
self.enable_loop = False
self.scale = Scale(test_context)
self.finjector_thread = None
self.failure_length_provider = scale_dependent_length(self.scale)
Expand All @@ -56,11 +58,49 @@ def configure_finjector(self,
if delay_provider:
self.failure_delay_provier = delay_provider

def start_finjector(self):
self.finjector_thread = threading.Thread(
target=self._failure_injector_loop, args=())
self.finjector_thread.daemon = True
self.finjector_thread.start()
@contextmanager
def finj_thread(self):
"""
Get a context manager that holds the test in manual failure injection
mode. Recoverable failures such as suspended process or network issues
will be repaired on exit.

:return: void
"""
try:
assert not self.enable_manual and not self.enable_loop
self.enable_loop = True
self.finjector_thread = threading.Thread(
target=self._failure_injector_loop, args=())
self.finjector_thread.start()
yield
finally:
self.enable_loop = False
if self.finjector_thread:
self.finjector_thread.join()
self._cleanup()

@contextmanager
def finj_manual(self):
"""
Get a context manager that holds the test in manual failure injection
mode. Recoverable failures such as suspended process or network issues
will be repaired on exit. Caller is supposed to make inject_failure()
calls inside the `with` statement.

:return: a callable with a single failure spec argument
"""
try:
assert not self.enable_manual and not self.enable_loop
self.enable_manual = True

def callable(spec):
return self.inject_failure(spec)

yield callable
finally:
self.enable_manual = False
self._cleanup()

def random_failure_spec(self):
f_type = random.choice(self.allowed_failures)
Expand All @@ -70,6 +110,7 @@ def random_failure_spec(self):
return FailureSpec(node=node, type=f_type, length=length)

def inject_failure(self, spec):
assert self.enable_manual or self.enable_loop
f_injector = make_failure_injector(self.redpanda)
f_injector.inject_failure(spec)

Expand All @@ -80,8 +121,7 @@ def _next_failure(self):
return self.random_failure_spec()

def _failure_injector_loop(self):

while self.enable_failures:
while self.enable_loop:
f_injector = make_failure_injector(self.redpanda)
f_injector.inject_failure(self._next_failure())

Expand All @@ -90,9 +130,6 @@ def _failure_injector_loop(self):
f"waiting {delay} seconds before next failure")
time.sleep(delay)

def teardown(self):
self.enable_failures = False
if self.finjector_thread:
self.finjector_thread.join()
def _cleanup(self):
make_failure_injector(self.redpanda)._heal_all()
make_failure_injector(self.redpanda)._continue_all()
Loading