Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding node suspension test and small raft fixes #503

Merged
merged 10 commits into from
Nov 6, 2019
6 changes: 4 additions & 2 deletions src/consensus/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ namespace raft
RequestVote rv = {raft_request_vote,
local_id,
current_term,
last_idx,
get_term_internal(last_idx)};
commit_idx,
get_term_internal(commit_idx)};

channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, rv);
}
Expand Down Expand Up @@ -1116,6 +1116,8 @@ namespace raft
void rollback(Index idx)
{
store->rollback(idx);
ledger->truncate(idx);
last_idx = idx;
LOG_DEBUG_FMT("Rolled back at {}", idx);

while (!committable_indices.empty() && (committable_indices.back() > idx))
Expand Down
8 changes: 7 additions & 1 deletion tests/infra/ccf.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def _wait_for_node_to_exist_in_store(
if self._check_node_exists(remote_node, node_id, node_status):
exists = True
break
time.sleep(1)
time.sleep(2)
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
if not exists:
raise TimeoutError(
f"Node {node_id} has not yet been recorded in the store"
Expand Down Expand Up @@ -810,6 +810,12 @@ def start(self, lib_name, enclave_type, workspace, label, members_certs, **kwarg
)
self.network_state = NodeNetworkState.joined

def suspend(self):
return self.remote.suspend()

def resume(self):
self.remote.resume()

def join(
self, lib_name, enclave_type, workspace, label, target_rpc_address, **kwargs
):
Expand Down
44 changes: 44 additions & 0 deletions tests/infra/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import paramiko
import logging
import subprocess
import psutil
import getpass
from contextlib import contextmanager
import infra.path
Expand Down Expand Up @@ -135,12 +136,16 @@ def __init__(
self.files += data_files
self.cmd = cmd
self.client = paramiko.SSHClient()
# this client is used to execute commands on the remote host since the main client uses pty
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
self.proc_client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.proc_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.root = os.path.join(workspace, label + "_" + name)
self.name = name
self.env = env or {}
self.out = os.path.join(self.root, "out")
self.err = os.path.join(self.root, "err")
self.suspension_proc = None

def _rc(self, cmd):
LOG.info("[{}] {}".format(self.hostname, cmd))
Expand All @@ -150,6 +155,7 @@ def _rc(self, cmd):
def _connect(self):
LOG.debug("[{}] connect".format(self.hostname))
self.client.connect(self.hostname)
self.proc_client.connect(self.hostname)

def _setup_files(self):
assert self._rc("rm -rf {}".format(self.root)) == 0
Expand Down Expand Up @@ -231,6 +237,22 @@ def start(self):
cmd = self._cmd()
LOG.info("[{}] {}".format(self.hostname, cmd))
stdin, stdout, stderr = self.client.exec_command(cmd, get_pty=True)
_, stdout_, _ = self.proc_client.exec_command(f'ps -ef | pgrep -f "{cmd}"')
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
self.pid = stdout_.readline()

def suspend(self):
_, stdout, _ = self.proc_client.exec_command(f"kill -STOP {self.pid}")
if stdout.channel.recv_exit_status() == 0:
LOG.info(f"Node {self.name} suspended...")
return True
LOG.info(f"Node {self.name} can not be suspended...")
return False

def resume(self):
_, stdout, _ = self.proc_client.exec_command(f"kill -CONT {self.pid}")
if stdout.channel.recv_exit_status() != 0:
raise RuntimeError(f"Could not resume node {self.name} from suspension!")
LOG.info(f"Node {self.name} resuming from suspension...")
olgavrou marked this conversation as resolved.
Show resolved Hide resolved

def stop(self):
"""
Expand All @@ -243,6 +265,7 @@ def stop(self):
"{}_err_{}".format(self.hostname, self.name),
)
self.client.close()
self.proc_client.close()

def setup(self):
"""
Expand Down Expand Up @@ -336,6 +359,7 @@ def __init__(
self.cmd = cmd
self.root = os.path.join(workspace, label + "_" + name)
self.proc = None
self.suspension_proc = None
self.stdout = None
self.stderr = None
self.env = env
Expand Down Expand Up @@ -391,6 +415,20 @@ def start(self, timeout=10):
stderr=self.stderr,
env=self.env,
)
self.suspension_proc = psutil.Process(pid=self.proc.pid)
olgavrou marked this conversation as resolved.
Show resolved Hide resolved

def suspend(self):
try:
self.suspension_proc.suspend()
LOG.info(f"Node {self.name} suspended...")
return True
except psutil.NoSuchProcess:
LOG.info(f"Node {self.name} can not be suspended...")
return False

def resume(self):
self.suspension_proc.resume()
LOG.info(f"Node {self.name} resuming from suspension...")
olgavrou marked this conversation as resolved.
Show resolved Hide resolved

def stop(self):
"""
Expand Down Expand Up @@ -617,6 +655,12 @@ def setup(self):
def start(self):
self.remote.start()

def suspend(self):
return self.remote.suspend()

def resume(self):
self.remote.resume()

def get_startup_files(self):
self.remote.get(self.pem)
if self.start_type in {StartType.new, StartType.recover}:
Expand Down
157 changes: 157 additions & 0 deletions tests/node_suspension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
# Licensed under the Apache 2.0 License.
import os
import getpass
import time
import logging
import multiprocessing
import shutil
from random import seed
import infra.ccf
import infra.proc
import infra.jsonrpc
import infra.notification
import infra.net
import e2e_args
from threading import Timer
import random

from loguru import logger as LOG

# 256 is the number of messages PBFT keeps in memory before needing to replay the ledger
# by setting 200 pure requests we will surpass 256 messages being passed throught the protocol
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
TOTAL_REQUESTS = 200


def timeout(node, suspend, election_timeout):
if suspend:
# We want to suspend the nodes' process so we need to initiate a new timer to wake it up eventually
if not node.suspend():
LOG.info("Node can not be suspended, probably has stopped running")
return
next_timeout = random.uniform(election_timeout, 3 * election_timeout)
LOG.info(f"New timer set for node {node.node_id} is {next_timeout} seconds")
t = Timer(next_timeout, timeout, args=[node, False, 0])
t.start()
else:
node.resume()


def run(args):
hosts = ["localhost", "localhost", "localhost"]

with infra.notification.notification_server(args.notify_server) as notifications:
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
# Lua apps do not support notifications
# https://github.com/microsoft/CCF/issues/415
notifications_queue = (
notifications.get_queue() if args.package == "libloggingenc" else None
)

with infra.ccf.network(
hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
first_node, (backups) = network.start_and_join(args)

term_info = {}
long_msg = "X" * (2 ** 14)

# first timer determines after how many seconds each node will be suspended
t0 = random.uniform(1, 10)
LOG.info(f"Initial timer for primary is {t0} seconds...")
t1 = random.uniform(1, 10)
LOG.info(f"Initial timer for backup 1 is {t1} seconds...")
t2 = random.uniform(1, 10)
LOG.info(f"Initial timer for backup 2 is {t2} seconds...")
tm0 = Timer(
t0, timeout, args=[first_node, True, args.election_timeout / 1000],
)
tm0.start()
tm1 = Timer(
t1, timeout, args=[backups[0], True, args.election_timeout / 1000]
)
tm1.start()
tm2 = Timer(
t2, timeout, args=[backups[1], True, args.election_timeout / 1000]
)
tm2.start()

with first_node.node_client() as mc:
check_commit = infra.ccf.Checker(mc, notifications_queue)
check = infra.ccf.Checker()

LOG.info("Write messages to nodes using round robin")
with first_node.user_client(format="json") as c0:
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
with backups[0].user_client(format="json") as c1:
with backups[1].user_client(format="json") as c2:
node_id = 0
for id in range(1, TOTAL_REQUESTS):
node_id += 1
node_id %= 3
if node_id == 0:
c = c0
elif node_id == 1:
c = c1
else:
c = c2
try:
resp = c.rpc(
"LOG_record", {"id": id, "msg": long_msg}
)
except Exception:
LOG.info("Trying to access a suspended node")
try:
cur_primary, cur_term = network.find_primary()
term_info[cur_term] = cur_primary.node_id
except Exception:
LOG.info("Trying to access a suspended node")
id += 1

# wait for the last request to commit
final_msg = "Hello world!"
check_commit(
c.rpc("LOG_record", {"id": 1000, "msg": final_msg}),
result=True,
)
check(
c.rpc("LOG_get", {"id": 1000}),
result={"msg": final_msg},
)

# check that a new node can catch up after all the requests
new_node = network.create_and_trust_node(
lib_name=args.package,
host="localhost",
args=args,
should_wait=False,
)
assert new_node

with new_node.user_client(format="json") as c:
while True:
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
rep = c.do("LOG_get", {"id": 1000})
olgavrou marked this conversation as resolved.
Show resolved Hide resolved
if rep.error == None and rep.result is not None:
LOG.success(f"Last node is all caught up!")
break

# assert that view changes actually did occur
assert len(term_info) > 1

LOG.success(
"----------- terms and primaries recorded -----------"
)
for term, primary in term_info.items():
LOG.success(f"term {term} - primary {primary}")


if __name__ == "__main__":

args = e2e_args.cli_args()
args.package = args.app_script and "libluagenericenc" or "libloggingenc"

notify_server_host = "localhost"
args.notify_server = (
notify_server_host
+ ":"
+ str(infra.net.probably_free_local_port(notify_server_host))
)
run(args)