diff --git a/build-tools/block-data-plots/collect_data.py b/build-tools/block-data-plots/collect_data.py index 396d9dc89..d505c1d71 100644 --- a/build-tools/block-data-plots/collect_data.py +++ b/build-tools/block-data-plots/collect_data.py @@ -1,4 +1,5 @@ import argparse +import os import subprocess from pathlib import Path @@ -14,6 +15,7 @@ def collect_data(args): if args.output_file is None: + os.makedirs(DEFAULT_OUTPUT_DIR, exist_ok=True) output_file = DEFAULT_OUTPUT_DIR.joinpath( DEFAULT_OUTPUT_FILE_NAME_FMT.format(chain_type=args.chain_type) ) diff --git a/build-tools/fork-detection/README.md b/build-tools/fork-detection/README.md new file mode 100644 index 000000000..fb3be2565 --- /dev/null +++ b/build-tools/fork-detection/README.md @@ -0,0 +1,53 @@ +## Fork detection script, for the extra peace of mind + +Here we have `detector.py`, which is a relatively crude way of detecting a permanent fork (split) +in the network if it happens. + +The script basically runs the full sync in a loop, checking the node's log output for certain errors +and comparing its mainchain block ids with those obtained from the API server.\ +If anything suspicious is detected during the full sync, the script will save the node's data +directory and log file.\ +In any case, the script will temporarily ban some of the peers that participated in the sync +(so that the next iteration has a chance to have different ones and to reduce the strain on +the network) and start the full sync all over again, reusing the peerdb from the previous iteration. + +The node is always run with checkpoints disabled, so that it has the chance to find older forks too. + +The structure of the script's working directory (specified via the command line): +- `current_attempt` - this corresponds to the current sync attempt (iteration). +- `saved_attempts` - this contains subdirectories corresponding to attempts that + are considered suspicious; each subdirectory's name is the datetime of the moment + when the attempt was finished. +- `saved_peer_dbs` - this is where peer dbs from previous attempts are stored; the script + only needs the one from the latest attempt, but, just in case, the penultimate one is + also stored. +- `log.txt` - this is the log of the script itself. + +Each attempt's directory has the following structure: +- `flags` - this directory contains flag files (which are usually zero-length) indicating + that certain problems were found during the sync. It is what determines whether the attempt's + directory will be saved in the end (i.e. if the directory is non-empty, the attempt will be saved). +- `node_data` - this is the node's data directory of this attempt. +- `node_log.txt` - the node's log. + +Some notes: +* Currently the script requires Python 3.13 to run, though we may lift this requirement later. +* The script can send an email when it detects an issue using the local SMTP server + (if you're on Linux, google for an SMTP Postfix tutorial to set it up). +* Even if the script finds a problem (e.g. a checkpoint mismatch), you're still likely + to end up being on the correct chain. To download the actual fork for further investigation + you can initiate a separate full sync while using the node's option `--custom-checkpoints-csv-file` + to override the correct checkpoints with the wrong ones. +* Once the fork has been downloaded, you'll want to examine the contents of its chainstate db. + Currently we have the `chainstate-db-dumper` tool that can dump certain info about blocks + to a CSV file (the most interesting part of it being the ids of pools that continue producing + blocks on that fork). +* Once the fork has been investigated you can "permanently" ban the peers that have been sending it + to you, to prevent it from being reported again and again. To do so, you can add their ip + addresses to `permabanned_peers.txt` (one address per line) in the script's working directory + (it doesn't exist by default, so you'll have to create it first). Note that the file is checked + on every iteration, so you can update it while the script is already running and it will come + into effect when the next iteration starts. +* The script is likely to fail if a networking error occurs, e.g. if it can't query the API server. + So, run it in a loop in a shell script (with some delay after each run, to prevent it from spamming + you with warning emails). \ No newline at end of file diff --git a/build-tools/fork-detection/detector.py b/build-tools/fork-detection/detector.py new file mode 100644 index 000000000..bdcfe9d11 --- /dev/null +++ b/build-tools/fork-detection/detector.py @@ -0,0 +1,570 @@ +import argparse +import os +import queue +import re +import shlex +import shutil +import subprocess +import sys +import time +from datetime import datetime +from pathlib import Path +from queue import Queue +from threading import Thread +from typing import Optional +from urllib.parse import urlparse + +from utils import ( + colored, dir_missing_or_empty, exhaustive_stream_line_reader, hide_cursor, show_cursor, + init_logger, pretty_print_banned_peers, + CONSOLE_PRINTER, LOGGER as log, NODE_OUTPUT_PREFIX_COLOR, NODE_RPC_USER, NODE_RPC_PWD, + Error, APIServerClient, BannedPeer, EmailSender, NodeRPCClient, +) + + +DEFAULT_NODE_CMD = "cargo run --release --bin node-daemon --" +DEFAULT_NODE_RPC_BIND_ADDR = "127.0.0.1:12345" +DEFAULT_CHAIN_TYPE = "mainnet" +CHAIN_TYPE_CHOICES = ["mainnet", "testnet"] +CONTINUE_OPTION_NAME = "continue" + +CUR_ATTEMPT_SUBDIR = "current_attempt" +SAVED_ATTEMPTS_SUBDIR = "saved_attempts" + +FLAGS_SUBDIR = "flags" +NODE_DATA_SUBDIR = "node_data" +SAVED_PEER_DBS_SUBDIR = "saved_peer_dbs" + +LATEST_PEER_DB_SUBDIR = "latest" +PREV_PEER_DB_SUBDIR = "previous" + +# Note: this is defined by the node and cannot be changed. +PEER_DB_SUBDIR_IN_NODE_DATA = "peerdb-lmdb" + +# If the height difference between the current tip and a stale block is bigger than or equal to +# this value, a reorg to the stale block is no longer possible. +MAX_REORG_DEPTH = 1000 + +# The mapping from node's output to the name of the flag that must be automatically created +# as a result. +NODE_OUTPUT_LINE_REGEX_TO_FLAG_MAPPING = [ + (re.compile(r"\bCRITICAL\b"), "critical_error"), + (re.compile(r"Checkpoint mismatch"), "checkpoint_mismatch"), + (re.compile(r"\bERROR\b.+\bprocess_block\b"), "process_block_failure"), + (re.compile(r"\bERROR\b.+\bpreliminary_block_check\b"), "preliminary_block_check_failure"), + (re.compile(r"\bERROR\b.+\bpreliminary_headers_check\b"), "preliminary_headers_check_failure"), +] + +ENDED_UP_ON_A_FORK_FLAG_NAME = "ended_up_on_a_fork" +STALE_BLOCK_BELOW_REORG_LIMIT_FLAG_NAME = "stale_block_below_reorg_limit" +NO_INCOMING_BLOCKS_WHILE_ON_STALE_CHAIN_FLAG_NAME = "no_incoming_blocks_while_on_stale_chain" + +NODE_OUTPUT_LINE_NEW_TIP_REGEX = re.compile( + r"NEW TIP in chainstate (?P[0-9A-Fa-f]+) with height (?P\d+), timestamp: (?P\d+)" +) +NODE_OUTPUT_LINE_STALE_BLOCK_RECEIVED_REGEX = re.compile( + r"Received stale block (?P[0-9A-Fa-f]+) with height (?P\d+), timestamp: (?P\d+)" +) + +# The regex used to decide whether a node's output line should be printed to the console +# (we want to avoid debug and info lines since they're both too noisy during sync and put extra +# strain on the console app). +# Note that this is not 100% reliable, because a log record can technically span multiple lines, +# only the first of which will contain the severity. But at this moment we don't seem to emit +# multi-line log records during syncing (except for the initial "Starting with the following config"). +# But even if we did, this approach is "good enough" anyway, since you can always look into the log +# file for the missing details. +NODE_OUTPUT_LINE_TO_PRINT_REGEX = re.compile(r"^\S+\s+(WARN|ERROR)\b") + +# The regex by which we determine that the node is actually being started; this is mainly needed +# because by default we invoke cargo, which may have to do a lengthy compilation first. +# Also note that we use a log line indicating that p2p has already been started (instead of, say, +# an earlier log line such as "Starting mintlayer-core"). This helps catching the situation +# when the node starts and immediately exists due to the p2p port being unavailable. +NODE_STARTUP_OUTPUT_LINE_REGEX = re.compile(r"p2p.*Starting SyncManager") + +DEFAULT_BAN_DURATION_HOURS = 12 + +# We use Queue.shutdown which is only available since Python v3.13 +MIN_PYTHON_VERSION_MAJOR = 3 +MIN_PYTHON_VERSION_MINOR = 13 + +PERMABANNED_PEERS_FILE = "permabanned_peers.txt" +PERMABAN_DURATION_DAYS = 30 +PERMABAN_DURATION_SECS = 3600 * 24 * PERMABAN_DURATION_DAYS + + +class Handler(): + def __init__(self, args, email_sender): + CONSOLE_PRINTER.set_status("Initializing") + + self.email_sender = email_sender + self.working_dir = Path(args.working_dir).resolve() + os.makedirs(self.working_dir, exist_ok=True) + + init_logger(self.working_dir.joinpath("log.txt")) + log.info("Initializing") + + self.node_cmd = shlex.split(args.node_cmd) + + self.node_rpc_client = NodeRPCClient(args.node_rpc_bind_address) + self.api_server_client = APIServerClient(args.api_server_url) + + self.saved_attempts_dir = self.working_dir.joinpath(SAVED_ATTEMPTS_SUBDIR) + + self.saved_peer_dbs_dir = self.working_dir.joinpath(SAVED_PEER_DBS_SUBDIR) + self.latest_peer_db_dir = self.saved_peer_dbs_dir.joinpath(LATEST_PEER_DB_SUBDIR) + self.prev_peer_db_dir = self.saved_peer_dbs_dir.joinpath(PREV_PEER_DB_SUBDIR) + + self.permabanned_peers_file = self.working_dir.joinpath(PERMABANNED_PEERS_FILE) + + self.cur_attempt_dir = self.working_dir.joinpath(CUR_ATTEMPT_SUBDIR) + if os.path.exists(self.cur_attempt_dir) and not args.can_continue: + raise Error( + (f"The directory {self.cur_attempt_dir} already exists. " + f"Either delete it or pass '--{CONTINUE_OPTION_NAME}' to continue.") + ) + + self.cur_attempt_flags_dir = self.cur_attempt_dir.joinpath(FLAGS_SUBDIR) + self.cur_attempt_node_data_dir = self.cur_attempt_dir.joinpath(NODE_DATA_SUBDIR) + self.cur_attempt_logs_file = self.cur_attempt_dir.joinpath("node_log.txt") + + self.unban_all = args.unban_all + self.ban_duration_secs = args.ban_duration_hours * 3600 + + self.node_cmd += [ + "--datadir", self.cur_attempt_node_data_dir, + args.chain_type, + "--allow-checkpoints-mismatch", + "--rpc-bind-address", args.node_rpc_bind_address, + "--rpc-username", NODE_RPC_USER, + "--rpc-password", NODE_RPC_PWD, + ] + log.info(f"Node run command: {self.node_cmd}") + + def run(self): + try: + while True: + self.do_full_sync() + except KeyboardInterrupt: + log.info("Exiting due to Ctrl-C") + + def do_full_sync(self): + actual_tip_height = self.api_server_client.get_tip().height + log.info(f"Starting a new sync iteration, current chain height is {actual_tip_height}") + + os.makedirs(self.cur_attempt_flags_dir, exist_ok=True) + os.makedirs(self.cur_attempt_node_data_dir, exist_ok=True) + + self.restore_peer_db() + + node_proc_env = os.environ.copy() + # Note: "chainstate_verbose_block_ids=debug" allows to catch the "Received stale block" line + # and also forces certain block-processing functions in chainstate to print full block ids. + # We avoid using the "normal" debug log, because it's too noisy, e.g. even + # "info,chainstate=debug" produces hundreds of megabytes of logs during the full sync. + node_proc_env["RUST_LOG"] = "info,chainstate_verbose_block_ids=debug" + + node_proc = subprocess.Popen( + self.node_cmd, + encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=node_proc_env) + + last_tip_arrival_time = None + last_tip_height = None + + node_proc_stdout_queue = Queue() + Thread( + target=exhaustive_stream_line_reader, + args=(node_proc.stdout, node_proc_stdout_queue, self.cur_attempt_logs_file) + ).start() + + # This is called for each node's output line and on a timeout when reading the line + # from a queue. + # Returns True if the current attempt should continue and False otherwise. + def on_node_output_line_or_timeout(line: Optional[str]): + nonlocal actual_tip_height, last_tip_arrival_time, last_tip_height + + line = line if line is not None else "" + + for line_re, flag in NODE_OUTPUT_LINE_REGEX_TO_FLAG_MAPPING: + if line_re.search(line) is not None: + self.touch_flag(flag) + + cur_seconds_since_epoch = time.time() + + if (new_tip_match := NODE_OUTPUT_LINE_NEW_TIP_REGEX.search(line)) is not None: + block_id = new_tip_match.group("block_id") + height = int(new_tip_match.group("height")) + timestamp = int(new_tip_match.group("timestamp")) + + last_tip_arrival_time = cur_seconds_since_epoch + last_tip_height = height + + if height % 10 == 0: + CONSOLE_PRINTER.set_status(f"Synced to height {height}") + + # Update actual_tip_height if we've reached it. + if height >= actual_tip_height: + actual_tip_height = self.api_server_client.get_tip().height + + fresh_block_reached = timestamp >= cur_seconds_since_epoch - 120 + actual_tip_height_reached = height >= actual_tip_height + + # Note: we can't query the API server on every block, because it's a costly operation + # (unless the API server is being run on the same machine). So we only query it every + # few hundred blocks or if we're near the end of the sync. + # Note: 500 was chosen because it's also the distance between our checkpoints, + # but the precise value is not essential. + if height % 500 == 0 or fresh_block_reached or actual_tip_height_reached: + actual_block_id = self.api_server_client.get_block_id(height) + if block_id.lower() != actual_block_id.lower(): + if actual_tip_height - height >= MAX_REORG_DEPTH: + self.touch_flag(ENDED_UP_ON_A_FORK_FLAG_NAME) + + if fresh_block_reached: + log.info(f"Fresh block on a stale chain reached (height = {height})") + return False + + if actual_tip_height_reached: + log_func = log.info if height <= actual_tip_height + 1 else log.warning + extra = ( + "" if height == actual_tip_height + else f" (the API server is {height-actual_tip_height} block(s) behind)" + ) + log_func(f"Tip reached, height = {height}{extra}") + return False + + elif (stale_block_received_match := NODE_OUTPUT_LINE_STALE_BLOCK_RECEIVED_REGEX.search(line)) is not None: + block_id = stale_block_received_match.group("block_id") + height = int(stale_block_received_match.group("height")) + + log.warn(f"Stale block received at height {height}: {block_id}") + + if actual_tip_height - height >= MAX_REORG_DEPTH: + # Note: this may mean 2 things: + # a) If we're on the proper chain, then we've found a peer who is on + # a fork, in which case we definitely want to create this flag. + # b) If we're on a fork, then the stale block may be from the proper + # chain, in which case the flag creation is redundant, because the code + # above or below should catch this situation; but it's not harmful either. + self.touch_flag( + STALE_BLOCK_BELOW_REORG_LIMIT_FLAG_NAME, + f"height={height}, block id={block_id}" + ) + else: + seconds_since_last_tip = ( + cur_seconds_since_epoch - last_tip_arrival_time + if last_tip_arrival_time is not None else 0 + ) + + # Note: the reason for not receiving any blocks may be that we've already banned + # all or most of the potential peers. But if we're on a stale chain, then we may + # not receive any more blocks, so we have to stop. + # We'll also stop if some flags have already been created. + + if seconds_since_last_tip >= 120: + chainstate_info = self.node_rpc_client.get_chainstate_info() + tip_id = chainstate_info.best_block_id + tip_height = chainstate_info.best_block_height + + if tip_height != 0: + actual_block_id = self.api_server_client.get_block_id(tip_height) + + if tip_id.lower() != actual_block_id.lower(): + self.touch_flag(NO_INCOMING_BLOCKS_WHILE_ON_STALE_CHAIN_FLAG_NAME) + return False + + if self.have_flags(): + log.info("Exiting because we haven't received any blocks in a while, but some flags already exist") + return False + + return True + + # Will be called once the first non-empty line has been received from the node's output. + def on_node_started(): + self.node_rpc_client.ensure_rpc_started() + + perma_banned_peers = self.load_perma_banned_peers() + log.debug(f"Banning the following addresses for {PERMABAN_DURATION_DAYS} days: {perma_banned_peers}") + + for addr in perma_banned_peers: + self.node_rpc_client.ban_peer(addr, PERMABAN_DURATION_SECS) + + def filter_out_perma_banned_peers(peer_list: list[BannedPeer]) -> list[BannedPeer]: + return [peer for peer in peer_list if peer.ip not in perma_banned_peers] + + banned_peers = self.node_rpc_client.get_banned_peers() + banned_peers_str = pretty_print_banned_peers(banned_peers) + + log.debug(f"Currently banned peers: {banned_peers_str}") + + if self.unban_all: + self.unban_all = False + peers_to_unban = filter_out_perma_banned_peers(banned_peers) + if len(peers_to_unban) > 0: + log.info("Unbanning currently (non-permanently) banned peers due to the command line option") + + for peer in peers_to_unban: + self.node_rpc_client.unban_peer(peer.ip) + + banned_peers_after_unban = self.node_rpc_client.get_banned_peers() + unexpected_banned_peers = filter_out_perma_banned_peers(banned_peers_after_unban) + + if len(unexpected_banned_peers) > 0: + unexpected_banned_peers_str = pretty_print_banned_peers(unexpected_banned_peers) + log.warning(f"Some peers are still banned after unban: {unexpected_banned_peers_str}") + + def on_attempt_completion(): + # When a syncing attempt has been finished, but before the node has been stopped, + # we ban some of the currently connected peers for a long-enough duration: + # a) so that the next attempt can use different peers; + # b) to reduce the strain on the network. + + peer_ips_to_ban = self.get_node_peer_ip_addrs_to_ban() + + # Before banning, force the disconnection of all peers by disabling networking, + # to avoid sending them the "scary" disconnection reason "Your address is banned" + # (though nodes may still see this reason if they try connecting to our node + # during the next attempt). + self.node_rpc_client.enable_networking(False) + # Give the node some time to actually disconnect all peers. + time.sleep(2) + + for ip_addr in peer_ips_to_ban: + log.debug(f"Banning {ip_addr}") + self.node_rpc_client.ban_peer(ip_addr, self.ban_duration_secs) + + try: + node_started = False + set_status_and_debug_log("Waiting for the node to start") + + while True: + try: + try: + line = node_proc_stdout_queue.get(timeout=10) + + if NODE_OUTPUT_LINE_TO_PRINT_REGEX.search(line) is not None: + stdout_prefix = colored("node> ", NODE_OUTPUT_PREFIX_COLOR) + CONSOLE_PRINTER.print_to_stderr(f"{stdout_prefix} {line}", end="") + + if not node_started and NODE_STARTUP_OUTPUT_LINE_REGEX.search(line) is not None: + node_started = True + set_status_and_debug_log("Node started") + on_node_started() + except queue.Empty: + line = None + + if not on_node_output_line_or_timeout(line): + break + except queue.ShutDown: + # This means that the node has exited prematurely. But we check for this + # via the "poll" call below, so here it can be ignored. + pass + + exit_code = node_proc.poll() + if exit_code is not None: + raise Error(f"The node exited prematurely with exit code {exit_code}") + + # Shutdown the queue to prevent the reading thread from putting moree lines to it. + node_proc_stdout_queue.shutdown() + + on_attempt_completion() + + finally: + if last_tip_height is not None: + log.debug(f"Last handled tip height: {last_tip_height}") + + set_status_and_debug_log("Terminating the node") + + # Note: for some reason the node doesn't want to terminate sometimes, + # in particular this may happen when hitting Ctrl-C. Though the Ctrl-C case + # is not particularly important (since you can always hit it again), we want + # to protect against this situation during the normal script execution. + # So, we try terminating the node a few times and if it doesn't react, we kill it. + for i in range(3): + node_proc.terminate() + try: + node_proc.wait(timeout=5) + break + except subprocess.TimeoutExpired: + log.warning(f"Node didn't terminate, attempt {i}") + pass + else: + log.warning("Killing the node") + node_proc.kill() + node_proc.wait() + + self.save_peer_db() + + # If the script has created some flags, save the directory + if self.have_flags(): + os.makedirs(self.saved_attempts_dir, exist_ok=True) + + backup_dir_name = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") + backup_dir = self.saved_attempts_dir.joinpath(backup_dir_name) + + warning_msg = ("Sync iteration ended with some issues, " + f"backing up the the attempt's dir to {backup_dir}") + log.warning(warning_msg) + self.email_sender.send("Warning", warning_msg) + + os.rename(self.cur_attempt_dir, backup_dir) + else: + log.info("Sync iteration ended without issues, removing the attempt's dir") + shutil.rmtree(self.cur_attempt_dir) + + def have_flags(self): + return len(os.listdir(self.cur_attempt_flags_dir)) > 0 + + # Return the list of ip addresses we want to ban and the end of a sync attempt, + # to prevent syncing with the same peers again and again. + def get_node_peer_ip_addrs_to_ban(self): + peers = self.node_rpc_client.get_connected_peers() + + # Note: non-null `last_tip_block_time` means that the peer has sent us a block that + # became our tip. Other peers that had the same block but sent it a bit later are not + # counted, which means that it's technically possible to have a gadzillion peers where + # only one of them has a non-null `last_tip_block_time`. In practice though most of the + # currently connected peers should have a non-null `last_tip_block_time` after a full sync. + peers_with_last_tip_block_time = [ + peer for peer in peers if peer["last_tip_block_time"] is not None + ] + + log.debug(f"Obtaining peer ips to ban; total connected peers: {len(peers)}, " + f"peers with 'last_tip_block_time': {len(peers_with_last_tip_block_time)}") + + # Note: the return addresses have the form '{ip_addr}:{port}', which is interpreted + # as path by urlparse; prepending "//" convinces it that it's a full address. + return [urlparse("//" + peer["address"]).hostname for peer in peers_with_last_tip_block_time] + + # After the current attempt has been completed, save the current peer db. + def save_peer_db(self): + os.makedirs(self.saved_peer_dbs_dir, exist_ok=True) + + if os.path.exists(self.prev_peer_db_dir): + shutil.rmtree(self.prev_peer_db_dir) + + if os.path.exists(self.latest_peer_db_dir): + os.rename(self.latest_peer_db_dir, self.prev_peer_db_dir) + + cur_peer_db_dir = self.cur_attempt_node_data_dir.joinpath(PEER_DB_SUBDIR_IN_NODE_DATA) + shutil.copytree(cur_peer_db_dir, self.latest_peer_db_dir) + + # Before starting the next attempt, if the node dir is missing a peer db, copy the saved + # peer db into it. + def restore_peer_db(self): + cur_peer_db_dir = self.cur_attempt_node_data_dir.joinpath(PEER_DB_SUBDIR_IN_NODE_DATA) + + if dir_missing_or_empty(cur_peer_db_dir) and os.path.exists(self.latest_peer_db_dir): + shutil.copytree(self.latest_peer_db_dir, cur_peer_db_dir, dirs_exist_ok=True) + + # Touch a flag optionally appending some contents to it + def touch_flag(self, flag: str, contents=None): + flag_file = self.cur_attempt_flags_dir.joinpath(flag) + with open(flag_file, 'a') as file: + if contents is not None: + file.write(contents) + file.write("\n") + + log.warning(f"Flag created: {flag}") + + def load_perma_banned_peers(self) -> set[str]: + def trim_line(line): + # Allow the file to have comments + return line.split("#", 1)[0].strip() + + log.debug(f"Checking {self.permabanned_peers_file} for the list of permabanned peer addresses") + + try: + with open(self.permabanned_peers_file, "r") as file: + lines = file.readlines() + lines = [ + trimmed_line for line in lines + if len(trimmed_line := trim_line(line)) > 0 + ] + return set(lines) + except FileNotFoundError: + return set() + + +def set_status_and_debug_log(status): + log.debug(status) + CONSOLE_PRINTER.set_status(status) + + +def main(): + if sys.version_info < (MIN_PYTHON_VERSION_MAJOR, MIN_PYTHON_VERSION_MINOR): + print(f"This script requires Python {MIN_PYTHON_VERSION_MAJOR}.{MIN_PYTHON_VERSION_MINOR} or higher") + sys.exit(1) + + hide_cursor() + + try: + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + "--node-cmd", + help="Command to run the node", + default=DEFAULT_NODE_CMD) + parser.add_argument( + "--node-rpc-bind-address", + help="Node PRC bind address", + default=DEFAULT_NODE_RPC_BIND_ADDR) + parser.add_argument( + "--api-server-url", + help='API server URL', required=True) + parser.add_argument( + "--chain-type", + help="Chain type", + choices=CHAIN_TYPE_CHOICES, + default=DEFAULT_CHAIN_TYPE) + parser.add_argument( + "--working-dir", + help="Working directory, where all the output will be put", + required=True) + parser.add_argument( + f"--{CONTINUE_OPTION_NAME}", + help=(f"Proceed even if the '{CUR_ATTEMPT_SUBDIR}' subdirectory " + "already exists in the working dir"), + action="store_true", + dest="can_continue") + parser.add_argument( + "--ban-duration", + help="Ban duration, in hours", + dest="ban_duration_hours", + default=DEFAULT_BAN_DURATION_HOURS) + parser.add_argument( + "--unban-all", + help="Unban all node's peers on start", + action="store_true") + parser.add_argument( + "--notification-email", + help="Send notifications to this email using the local SMTP server", + default=None) + parser.add_argument( + "--notification-email-from", + help=("The from address for the notification email. " + "If None, the --notification-email value will be used"), + default=None) + args = parser.parse_args() + + email_sender = EmailSender(args.notification_email, args.notification_email_from) + + try: + Handler(args, email_sender).run() + except Exception as e: + email_sender.send("Error", f"Script terminated due to exception: {e}") + raise + except Error as e: + print(f"Error: {e}") + sys.exit(1) + finally: + CONSOLE_PRINTER.set_status("") + show_cursor() + + +if __name__ == "__main__": + main() diff --git a/build-tools/fork-detection/utils.py b/build-tools/fork-detection/utils.py new file mode 100644 index 000000000..2f5f6ae03 --- /dev/null +++ b/build-tools/fork-detection/utils.py @@ -0,0 +1,430 @@ +import json +import logging +import os +import platform +import queue +import requests +import smtplib +import sys +import time +from collections import namedtuple +from email.mime.text import MIMEText +from pathlib import Path +from queue import Queue +from threading import Lock +from typing import TextIO +from urllib.parse import urlparse + +import termcolor # type: ignore + + +class Error(Exception): + pass + + +# 'color' can be either the name of the color, e.g. "red", or a tuple where the first element +# is the name of the color and the second one the attributes, e.g. ("red", ["dark", "bold"]), +# or simply None. +def colored(text, color): + if color is None: + return text + elif isinstance(color, tuple): + return termcolor.colored(text, color[0], attrs=color[1]) + else: + return termcolor.colored(text, color) + + +# Color constants suitable for passing to the "colored" function. +# Note we are using the color names from termcolor v1.x, which doesn't have the "light_" colors. +# The "bold" attribute makes the color brighter, "dark" farker and "dark bold" is between +# "dark" and "bold". +# Note: the colors were chosen when using a Linux terminal with a dark theme, though they +# look ok with a light theme too. +STATUS_COLOR = ("cyan", ["bold"]) +LOG_DEBUG_COLOR = ("white", ["bold", "dark"]) +# "None" means it will be the normal foreground color, i.e. white for a dark theme and +# black for a light one. +LOG_INFO_COLOR = None +LOG_WARN_COLOR = "yellow" +LOG_ERROR_COLOR = "red" +LOG_CRITICAL_COLOR = ("red", ["bold"]) +NODE_OUTPUT_PREFIX_COLOR = "green" + +NODE_RPC_USER = "user" +NODE_RPC_PWD = "pwd" + +API_SERVER_TIMEOUT_SECS = 180 +NODE_RPC_TIMEOUT_SECS = 180 + +LOGGER = logging.getLogger("detector_logger") + + +# This class maintains a "status line" at the bottom of the terminal output, erasing and +# redrawing it when the normal output is performed. +# The status line is written to stdout while the normal output is always printed to stderr. +# Note that all printing in the app has to be done through the same object of this class +# (CONSOLE_PRINTER defined below), otherwise the output will be broken. +class ConsolePrinterWithStatus: + def __init__(self): + self.status = "" + self.mutex = Lock() + + if sys.stdout.isatty(): + # Prepare the line where the status will be shown. + sys.stdout.write("\n") + + def print_to_stderr(self, line, end = "\n"): + with self.mutex: + # If both are the same terminal, need to erase the status, print the line + # and then print the status again. + if stdout_and_stderr_are_same_terminal(): + # Note: technically we could write the line and then the required number + # of extra spaces, but that number is non-trivial to determine if the line + # or the status contain control chars. + self._erase_status() + sys.stdout.write(line) + sys.stdout.write(end) + sys.stdout.write(self.status) + sys.stdout.flush() + else: + print(line, file=sys.stderr) + + def set_status(self, status): + with self.mutex: + if sys.stdout.isatty(): + status = colored(status, STATUS_COLOR) + self._erase_status() + sys.stdout.write(status) + else: + sys.stdout.write(status) + sys.stdout.write("\n") + + sys.stdout.flush() + self.status = status + + def _erase_status(self): + sys.stdout.write("\r") + sys.stdout.write(" " * len(self.status)) + sys.stdout.write("\r") + + +CONSOLE_PRINTER = ConsolePrinterWithStatus() + + +# Log handler that prints the records via CONSOLE_PRINTER. +class LogConsoleHandler(logging.Handler): + def emit(self, record): + try: + msg = self.format(record) + CONSOLE_PRINTER.print_to_stderr(msg) + except Exception: + self.handleError(record) + + +# Log formatter that produces colored output. +class LogColoredFormatter(logging.Formatter): + def __init__(self, fmt: str): + super().__init__() + + self.formatters = { + logging.DEBUG: logging.Formatter(colored(fmt, LOG_DEBUG_COLOR)), + logging.INFO: logging.Formatter(colored(fmt, LOG_INFO_COLOR)), + logging.WARNING: logging.Formatter(colored(fmt, LOG_WARN_COLOR)), + logging.ERROR: logging.Formatter(colored(fmt, LOG_ERROR_COLOR)), + logging.CRITICAL: logging.Formatter(colored(fmt, LOG_CRITICAL_COLOR)), + } + + def format(self, record): + formatter = self.formatters.get(record.levelno) + return formatter.format(record) + + +def stdout_and_stderr_are_same_terminal(): + if not (sys.stdout.isatty() and sys.stderr.isatty()): + # At least one of them is not a terminal + return False + + if sys.platform.startswith("win"): + # On Windows, if both are terminals, then they should be the same terminal. + return True + + # On *nix, the more reliable way is to compare ttyname's. + stdout_name = os.ttyname(sys.stdout.fileno()) + stderr_name = os.ttyname(sys.stderr.fileno()) + return stdout_name == stderr_name + + +def init_logger(log_file: Path): + global LOGGER + + fmt = "%(asctime)s - %(levelname)s - %(message)s" + + console_handler = LogConsoleHandler() + console_handler.setFormatter(LogColoredFormatter(fmt)) + + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(logging.Formatter(fmt)) + + LOGGER.addHandler(console_handler) + LOGGER.addHandler(file_handler) + + LOGGER.setLevel(logging.DEBUG) + + # Without this the records will be propagated to the root logger and printed twice. + LOGGER.propagate = False + + +def dir_missing_or_empty(path: Path): + return not os.path.exists(path) or len(os.listdir(path)) == 0 + + +def prettify_duration(duration_secs: int) -> str: + if duration_secs == 0: + return "0s" + + result = "" + def append(val, symbol): + nonlocal result + if val != 0: + sep = " " if len(result) > 0 else "" + result += f"{sep}{val}{symbol}" + + duration_mins = duration_secs // 60 + duration_hrs = duration_mins // 60 + duration_days = duration_hrs // 24 + + append(duration_days, "d") + append(duration_hrs % 24, "h") + append(duration_mins % 60, "m") + append(duration_secs % 60, "s") + + return result + + +# The function reads lines from the stream and puts them to the queue. +# Even if the queue has been shut down on the receiving end, the function will continue +# to read from the stream until it is closed. +# +# This is intended to be used with subprocess.Popen when its stdout/stderr are in the PIPE mode, +# (because not reading the pipes may result in the child process dead locking when the pipe +# buffer becomes full). +# +# The function will also log (append) the read lines to the specified file, if provided. +def exhaustive_stream_line_reader(stream: TextIO, queue_obj: Queue, log_file: Path | None = None): + def reader(log_stream): + queue_already_shut_down = False + + # Loop until readline returns '', which means that the other end of the stream has been closed. + for line in iter(stream.readline, ''): + if log_stream is not None: + log_stream.write(line) + log_stream.flush() + + if not queue_already_shut_down: + try: + queue_obj.put(line) + except queue.ShutDown: + queue_already_shut_down = True + + queue_obj.shutdown() + + if log_file is not None: + with open(log_file, 'a') as log_stream: + reader(log_stream) + else: + reader(None) + + +BlockInfo = namedtuple("BlockInfo", ["id", "height"]) +BannedPeer = namedtuple("BannedPeer", ["ip", "banned_until_as_secs_since_epoch"]) +ChainstateInfo = namedtuple("ChainstateInfo", ["best_block_height", "best_block_id", "best_block_timestamp"]) + + +class APIServerClient: + def __init__(self, server_url): + if len(urlparse(server_url).scheme) == 0: + raise Error("The provided API server URL must contain a scheme") + + self.server_url = server_url + self.session = requests.Session() + + def _get(self, path: str, request_params): + url = f"{self.server_url}/api/v2/{path}" + try: + response = self.session.get(url, params=request_params, timeout=API_SERVER_TIMEOUT_SECS) + except requests.exceptions.Timeout: + raise Error(f"API server request to '{path}' timed out") + except requests.exceptions.ConnectionError: + raise Error("Cannot connect to the API server") + + if response.status_code == 404: + return None + response.raise_for_status() + return response.json() + + def get_tip(self): + tip_info = self._get("chain/tip", {}) + return BlockInfo(id=tip_info["block_id"], height=tip_info["block_height"]) + + def get_block_id(self, height: int): + return self._get(f"chain/{height}", {}) + + +class NodeRPCClient: + def __init__(self, server_url): + self.server_url = server_url + self.session = requests.Session() + + def _post(self, method: str, method_params, timeout=NODE_RPC_TIMEOUT_SECS, handle_exceptions=True): + headers = {"Content-Type": "application/json"} + payload = { + "jsonrpc": "2.0", + "method": method, + "params": method_params, + "id": 1, + } + url = f"http://{NODE_RPC_USER}:{NODE_RPC_PWD}@{self.server_url}" + try: + response = self.session.post(url, headers=headers, data=json.dumps(payload), timeout=timeout) + except requests.exceptions.Timeout: + if handle_exceptions: + raise Error(f"Node RPC request '{method}' timed out") + else: + raise + except requests.exceptions.ConnectionError: + if handle_exceptions: + raise Error("Cannot connect to the node via RPC") + else: + raise + + response.raise_for_status() + json_data = response.json() + + if "error" in json_data: + err_code = json_data["error"]["code"] + err_msg = json_data["error"]["message"] + raise Error( + f"Node RPC method '{method}' failed with code {err_code} and message '{err_msg}'" + ) + + return json_data["result"] + + def enable_networking(self, enable: bool): + self._post("p2p_enable_networking", [enable]) + + def get_connected_peers(self): + return self._post("p2p_get_connected_peers", []) + + def get_banned_peers(self) -> list[BannedPeer]: + raw_peers = self._post("p2p_list_banned", []) + pretty_peers = [] + for peer in raw_peers: + ip = peer[0] + banned_until_secs = peer[1]["time"]["secs"] + banned_until_nanos = peer[1]["time"]["nanos"] + # Round the seconds up. + banned_until_secs += 1 if banned_until_nanos != 0 else 0 + pretty_peers += [BannedPeer(ip=ip, banned_until_as_secs_since_epoch=banned_until_secs)] + + return pretty_peers + + def ban_peer(self, peer_addr: str, duration_secs: int): + self._post("p2p_ban", [peer_addr, {"secs":duration_secs, "nanos":0}]) + + def unban_peer(self, peer_addr: str): + self._post("p2p_unban", [peer_addr]) + + def get_chainstate_info(self) -> ChainstateInfo: + info = self._post("chainstate_info", []) + bb_height = int(info["best_block_height"]) + bb_timestamp = int(info["best_block_timestamp"]["timestamp"]) + bb_id = info["best_block_id"] + + return ChainstateInfo( + best_block_height=bb_height, best_block_id=bb_id, best_block_timestamp=bb_timestamp + ) + + # Assuming that the node has already been started, wait until it is reachable via rpc. + def ensure_rpc_started(self): + max_attempts = 10 + for i in range(max_attempts): + try: + # Note: since we're repeating this multiple times, the timeout has to be small. + self._post("p2p_get_peer_count", [], timeout=5, handle_exceptions=False) + return + except requests.exceptions.ConnectionError: + time.sleep(1) + except requests.exceptions.Timeout: + # Try again on timeout too, just don't waste extra time on sleeping. + pass + else: + raise Error("The node is expected to have been started already, but RPC requests don't work") + + +def pretty_print_banned_peers(banned_peers: list[BannedPeer], multiline = True) -> str: + cur_secs_since_epoch = int(time.time()) + + # Note: the ban time can be in the past if we're restarting the script after a delay. + # Such peers are not really banned anymore. + banned_peers = [ + peer for peer in banned_peers if peer.banned_until_as_secs_since_epoch > cur_secs_since_epoch + ] + + if len(banned_peers) == 0: + return "[]" + + if multiline: + result = "[\n" + else: + result = "[" + + for idx, peer in enumerate(banned_peers): + duration = prettify_duration(peer.banned_until_as_secs_since_epoch - cur_secs_since_epoch) + line = f"(ip: {peer.ip}, remaining duration: {duration})" + + if multiline: + opt_sep = "," if idx != len(banned_peers) - 1 else "" + result += f" {line}{opt_sep}\n" + else: + opt_sep = ", " if idx != len(banned_peers) - 1 else "" + result += f"{line}{opt_sep}" + + result += "]" + + return result + + +def hide_cursor(): + esc_seq = "\033[?25l" + if sys.stdout.isatty(): + sys.stdout.write(esc_seq) + if sys.stderr.isatty(): + sys.stderr.write(esc_seq) + + +def show_cursor(): + esc_seq = "\033[?25h" + if sys.stdout.isatty(): + sys.stdout.write(esc_seq) + if sys.stderr.isatty(): + sys.stderr.write(esc_seq) + + +# Sends notification emails to the specified address if it's not None, otherwise does nothing. +class EmailSender: + # to_addr - the address to send emails to; if None, nothing will be sent. + # from_addr - the 'from' address for the emails; if None, to_addr will be used. + def __init__(self, to_addr: str | None, from_addr: str | None): + self.to_addr = to_addr + self.from_addr = from_addr or to_addr + + def send(self, msg_subj, msg_body): + if self.to_addr is not None: + msg = MIMEText(msg_body) + msg["Subject"] = msg_subj + msg["From"] = f"Fork detection script at {platform.node()} <{self.from_addr}>" + msg["To"] = self.to_addr + + s = smtplib.SMTP('localhost') + s.sendmail(self.from_addr, [self.to_addr], msg.as_string()) + s.quit() diff --git a/chainstate/src/detail/mod.rs b/chainstate/src/detail/mod.rs index 33d0178e2..5009a3eeb 100644 --- a/chainstate/src/detail/mod.rs +++ b/chainstate/src/detail/mod.rs @@ -636,10 +636,20 @@ impl Chainstate self.update_initial_block_download_flag() .map_err(BlockError::BestBlockIdQueryError)?; } else { - tracing::debug!( - target: CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS, - "Stale block received: {block_id}" - ); + if tracing::event_enabled!(tracing::Level::DEBUG, CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS) { + // FIXME: return custom enum from attempt_to_process_block + let chainstate_ref = self.make_db_tx_ro().map_err(BlockError::from)?; + let bi = get_existing_block_index(&chainstate_ref, &block_id)?; + + tracing::debug!( + target: CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS, + "Received stale block {:x} with height {}, timestamp: {} ({})", + bi.block_id(), + bi.block_height(), + bi.block_timestamp(), + bi.block_timestamp().into_time(), + ) + } } Ok(result)