From e90667962edd683ebcd461e57696c4640edb731b Mon Sep 17 00:00:00 2001 From: Archit Sharma Date: Wed, 13 Jun 2018 00:32:43 +0530 Subject: [PATCH 1/5] server.py: segregate thread condition logic from __main__ --- trueconsensus/fastchain/local_config.py | 2 + trueconsensus/fastchain/pbft-py/node.py | 2 + trueconsensus/fastchain/pbft-py/server.py | 151 ++++++++++++---------- 3 files changed, 89 insertions(+), 66 deletions(-) diff --git a/trueconsensus/fastchain/local_config.py b/trueconsensus/fastchain/local_config.py index 0cdfa9b..ff8ba07 100644 --- a/trueconsensus/fastchain/local_config.py +++ b/trueconsensus/fastchain/local_config.py @@ -21,3 +21,5 @@ # KEY DIRECTORY KD = os.getcwd() + "/keys" print KD + +# threading_enabled = False diff --git a/trueconsensus/fastchain/pbft-py/node.py b/trueconsensus/fastchain/pbft-py/node.py index 5011f60..7b96775 100644 --- a/trueconsensus/fastchain/pbft-py/node.py +++ b/trueconsensus/fastchain/pbft-py/node.py @@ -998,6 +998,8 @@ def server_loop(self): break self.buffmap[fd] = self.buffmap[fd][size+4:] else: + # TODO: check if remaining buffmap of slice + # less than size+4 as leftover crumbs break if self.kill_flag: sys.exit() diff --git a/trueconsensus/fastchain/pbft-py/server.py b/trueconsensus/fastchain/pbft-py/server.py index a221c4f..d961d98 100755 --- a/trueconsensus/fastchain/pbft-py/server.py +++ b/trueconsensus/fastchain/pbft-py/server.py @@ -44,6 +44,88 @@ def signal_handler(event, frame): sys.exit(130) # Ctrl-C for bash +def init_server(id): + global RL + try: + ip, port = RL[id] + except IndexError as E: + quit("%s Ran out of replica list. No more server config to try" % E) + s = socket.socket() + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # host = socket.gethostname() + host = "0.0.0.0" + s.bind((host, port)) # on EC2 we cannot directly bind on public addr + s.listen(50) + s.setblocking(0) + _logger.debug("Server [%s] -- listening on port %s" % (id, port)) + _logger.debug("IP: %s" % ip) + return s + + +class ThreadedExecution(object): + + def __init__(self): + pass + + def run(self, ID): + sys.stdout.write("run started\n") + sys.stdout.flush() + socket_obj = init_server(ID) + n = node.node(ID, 0, N) + # n.init_keys(N) + n.init_replica_map(socket_obj) + n.server_loop() + sys.stdout.write("run exited\n") + sys.stdout.flush() + + def launch(self): + threads = [] + for i in range(N): + thread = Thread(target=self.run, args=[i]) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + sys.stdout.write("join completed\n") + sys.stdout.flush() + + +class NonThreadedExecution(object): + ''' + Finds sockets that aren't busy and attempts to establish and launch testbed + ''' + def __init__(self): + pass + + def init_server_socket(self, _id=None): + """ + triggers setup using testbed_config. Increments given server id + if that (ip, socket) from Replica List RL is already in use. + """ + global N + c = _id + while c < N: + s = None + try: + s = init_server(c) + except OSError as E: + _logger.error("%s -- Server ID: [%s]" % (E, c)) + c -= 1 + if s: + return s, c + + def launch(self): + socket_obj, _id = self.init_server_socket( + _id=config_yaml["testbed_config"]["server_id_init"] - 1 + ) + n = node.node(_id, 0, N) + # n.init_keys(N) + n.init_replica_map(socket_obj) + n.server_loop() + + # def pbft_usage(): # parser.add_argument("-n", "--nodes", dest="node_count", action='store', # help="# of PBFT nodes to be launched") @@ -62,71 +144,8 @@ def signal_handler(event, frame): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - def init_server(id): - global RL - try: - ip, port = RL[id] - except: - quit("Ran out of replica list address range. No more server config to try") - s = socket.socket() - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - # host = socket.gethostname() - host = "0.0.0.0" - s.bind((host, port)) # on EC2 we cannot directly bind on public addr - s.listen(50) - s.setblocking(0) - _logger.debug("Server [%s] -- listening on port %s" % (id, port)) - _logger.debug("IP: %s" % ip) - return s - + # import pdb; pdb.set_trace() if threading_enabled: - - threads = [] - def run(ID): - sys.stdout.write("run started\n") - sys.stdout.flush() - socket_obj = init_server(ID) - n = node.node(ID, 0, N) - # n.init_keys(N) - n.init_replica_map(socket_obj) - n.server_loop() - sys.stdout.write("run exited\n") - sys.stdout.flush() - - for i in range(N): - thread = Thread(target=run, args=[i]) - thread.start() - threads.append(thread) - - for thread in threads: - thread.join() - - sys.stdout.write("join completed\n") - sys.stdout.flush() - + ThreadedExecution().launch() else: - # import pdb; pdb.set_trace() - def init_server_socket(_id=None): - """ - triggers setup using testbed_config. Increments given server id - if that (ip, socket) from Replica List RL is already in use. - """ - global N - c = _id - while c < N: - s = None - try: - s = init_server(c) - except OSError as E: - _logger.error("%s -- Server ID: [%s]" % (E, c)) - c -= 1 - if s: - return s, c - - socket_obj, _id = init_server_socket( - _id=config_yaml["testbed_config"]["server_id_init"]-1 - ) - n = node.node(_id, 0, N) - # n.init_keys(N) - n.init_replica_map(socket_obj) - n.server_loop() + NonThreadedExecution().launch() From 6a315862d625c472c1b2c667958c7d073a0b0afb Mon Sep 17 00:00:00 2001 From: Archit Sharma Date: Wed, 13 Jun 2018 17:10:12 +0530 Subject: [PATCH 2/5] merge pbft-py into fastchain --- requirements.txt | 1 + .../fastchain/{pbft-py => }/.gitignore | 0 .../fastchain/{pbft-py => }/README.md | 22 +- .../fastchain/{pbft-py => }/auto_reader.py | 0 .../{pbft-py => }/backup_utils/sig.py | 0 trueconsensus/fastchain/{pbft-py => }/bank.py | 0 .../fastchain/{pbft-py => }/client.py | 0 trueconsensus/fastchain/config.py | 144 ++++- .../fastchain/{pbft-py => }/ecdsa_sig.py | 0 .../{pbft-py => }/generate_requests_dat.py | 1 + trueconsensus/fastchain/local_config.py | 29 +- .../{logging.py => log_maintainer.py} | 0 .../fastchain/{pbft-py => }/make_keys.py | 0 trueconsensus/fastchain/{pbft-py => }/node.py | 0 .../{pbft-py => }/parse_client_log.py | 0 .../fastchain/{pbft-py => }/pauto_reader.py | 0 .../inspectionProfiles/profiles_settings.xml | 7 - .../fastchain/pbft-py/.idea/misc.xml | 4 - .../fastchain/pbft-py/.idea/modules.xml | 8 - .../fastchain/pbft-py/.idea/pbft.iml | 12 - .../fastchain/pbft-py/.idea/workspace.xml | 546 ------------------ trueconsensus/fastchain/pbft-py/config.py | 124 ---- .../fastchain/pbft-py/local_config.py | 12 - .../fastchain/pbft-py/requirements.txt | 8 - .../{pbft-py => }/pbft_logistics.cfg | 0 .../{pbft-py => }/pbft_tunables.yaml | 0 .../fastchain/{pbft-py => }/proto.sh | 0 .../fastchain/{pbft-py => }/proto_message.py | 0 .../fastchain/{pbft-py => }/request.proto | 0 .../fastchain/{pbft-py => }/request_pb2.py | 0 trueconsensus/fastchain/{pbft-py => }/run.sh | 0 .../fastchain/{pbft-py => }/server.py | 0 trueconsensus/fastchain/{pbft-py => }/test.py | 0 .../fastchain/{pbft-py => }/timer.py | 0 trueconsensus/multithread_signal_handling.py | 44 -- 35 files changed, 134 insertions(+), 828 deletions(-) rename trueconsensus/fastchain/{pbft-py => }/.gitignore (100%) rename trueconsensus/fastchain/{pbft-py => }/README.md (95%) rename trueconsensus/fastchain/{pbft-py => }/auto_reader.py (100%) rename trueconsensus/fastchain/{pbft-py => }/backup_utils/sig.py (100%) rename trueconsensus/fastchain/{pbft-py => }/bank.py (100%) rename trueconsensus/fastchain/{pbft-py => }/client.py (100%) rename trueconsensus/fastchain/{pbft-py => }/ecdsa_sig.py (100%) rename trueconsensus/fastchain/{pbft-py => }/generate_requests_dat.py (99%) rename trueconsensus/fastchain/{logging.py => log_maintainer.py} (100%) rename trueconsensus/fastchain/{pbft-py => }/make_keys.py (100%) rename trueconsensus/fastchain/{pbft-py => }/node.py (100%) rename trueconsensus/fastchain/{pbft-py => }/parse_client_log.py (100%) rename trueconsensus/fastchain/{pbft-py => }/pauto_reader.py (100%) delete mode 100644 trueconsensus/fastchain/pbft-py/.idea/inspectionProfiles/profiles_settings.xml delete mode 100644 trueconsensus/fastchain/pbft-py/.idea/misc.xml delete mode 100644 trueconsensus/fastchain/pbft-py/.idea/modules.xml delete mode 100644 trueconsensus/fastchain/pbft-py/.idea/pbft.iml delete mode 100644 trueconsensus/fastchain/pbft-py/.idea/workspace.xml delete mode 100644 trueconsensus/fastchain/pbft-py/config.py delete mode 100644 trueconsensus/fastchain/pbft-py/local_config.py delete mode 100644 trueconsensus/fastchain/pbft-py/requirements.txt rename trueconsensus/fastchain/{pbft-py => }/pbft_logistics.cfg (100%) rename trueconsensus/fastchain/{pbft-py => }/pbft_tunables.yaml (100%) rename trueconsensus/fastchain/{pbft-py => }/proto.sh (100%) rename trueconsensus/fastchain/{pbft-py => }/proto_message.py (100%) rename trueconsensus/fastchain/{pbft-py => }/request.proto (100%) rename trueconsensus/fastchain/{pbft-py => }/request_pb2.py (100%) rename trueconsensus/fastchain/{pbft-py => }/run.sh (100%) rename trueconsensus/fastchain/{pbft-py => }/server.py (100%) rename trueconsensus/fastchain/{pbft-py => }/test.py (100%) rename trueconsensus/fastchain/{pbft-py => }/timer.py (100%) delete mode 100644 trueconsensus/multithread_signal_handling.py diff --git a/requirements.txt b/requirements.txt index ef0c38c..2c068be 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ bitcoin==1.1.42 docopt==0.6.2 ecdsa==0.13 +protobuf==3.5.2.post1 pycrypto==2.6.1 pykwalify==1.6.1 PySocks==1.6.8 diff --git a/trueconsensus/fastchain/pbft-py/.gitignore b/trueconsensus/fastchain/.gitignore similarity index 100% rename from trueconsensus/fastchain/pbft-py/.gitignore rename to trueconsensus/fastchain/.gitignore diff --git a/trueconsensus/fastchain/pbft-py/README.md b/trueconsensus/fastchain/README.md similarity index 95% rename from trueconsensus/fastchain/pbft-py/README.md rename to trueconsensus/fastchain/README.md index d5c98aa..eb17f91 100644 --- a/trueconsensus/fastchain/pbft-py/README.md +++ b/trueconsensus/fastchain/README.md @@ -4,32 +4,30 @@ This is a `Practical Byzantine Fault Tolerance` implementation of Miguel Castro ### Setup -#### Install Google's Protobuf - -Download and install [google protobuf](https://github.com/google/protobuf/tree/master/python/google) - -``` -# brew install protobuf -OR -# pip install protobuf -``` - #### Configure paths and tunables Fill up the config files `pbft_logistics.cfg` and `pbft_tunables.yaml` or use defaults. - #### Install dependencies __Recommended__: Use a python3 virtual environment ``` virtualenv -p python3 venv -virtualenv -p python3 venv source venv/bin/activate pip install -r requirements.txt ``` +##### Install Google's Protobuf + +Download and install [google protobuf](https://github.com/google/protobuf/tree/master/python/google) + +``` +# brew install protobuf +OR +# pip install protobuf +``` + #### Generate required content as a precusory Then proceed as follows: diff --git a/trueconsensus/fastchain/pbft-py/auto_reader.py b/trueconsensus/fastchain/auto_reader.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/auto_reader.py rename to trueconsensus/fastchain/auto_reader.py diff --git a/trueconsensus/fastchain/pbft-py/backup_utils/sig.py b/trueconsensus/fastchain/backup_utils/sig.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/backup_utils/sig.py rename to trueconsensus/fastchain/backup_utils/sig.py diff --git a/trueconsensus/fastchain/pbft-py/bank.py b/trueconsensus/fastchain/bank.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/bank.py rename to trueconsensus/fastchain/bank.py diff --git a/trueconsensus/fastchain/pbft-py/client.py b/trueconsensus/fastchain/client.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/client.py rename to trueconsensus/fastchain/client.py diff --git a/trueconsensus/fastchain/config.py b/trueconsensus/fastchain/config.py index 535b932..03beee2 100644 --- a/trueconsensus/fastchain/config.py +++ b/trueconsensus/fastchain/config.py @@ -1,40 +1,124 @@ -#!/usr/bin/env python -# -# Copyright (c) 2018 TrueChain Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +#!/bin/env python import os +import yaml +import logging +import configparser # from subprocess import check_output +# from pykwalify import core as pykwalify_core +# from pykwalify import errors as pykwalify_errors +from logging.handlers import RotatingFileHandler -MAX_FAIL = 1 -N = 2 # the number of parties excluding the client -basePort = 49505 +from local_config import CFG_YAML_PATH, \ + CFG_GENERAL_PATH, \ + PEER_NETWORK_FILE -host_file_content = open(os.path.expanduser('~')+'/hosts', 'r').read() -IP_LIST = [l.strip() for l in host_file_content.split('\n') if l] -total = len(IP_LIST) -# We reserve the last IP as the client -RL = [(l, basePort+i) for i, l in enumerate(IP_LIST[:N])] +def load_config(path, no_val=False): + """ + general logistics such as log file paths in logistics CFG + """ + if no_val: + config = configparser.ConfigParser(allow_no_value=True) + else: + config = configparser.ConfigParser() + config.read(path) + + return config + + +def load_yaml_config(path, no_val=False): + """ + general tunables specified in YAML + """ + with open(path, "r") as config_file: + pbft_config = yaml.safe_load(config_file) + + _logger.debug("PBFT config {} yaml loaded".format(path)) + + # # Validate base config for Browbeat format + # _validate_yaml("pbft", pbft_config) + # _logger.info("Config {} validated".format(path)) + return pbft_config + +# +# def _validate_yaml(schema, config): +# """Raises exception if config is invalid. +# :param schema: The schema to validate with (pbft, pow, hybrid...) +# :param config: Loaded yaml to validate +# """ +# check = pykwalify_core.Core( +# source_data=config, +# schema_files=["{}/{}.yml".format(conf_schema_path, schema)]) +# try: +# check.validate(raise_exception=True) +# except pykwalify_errors.SchemaError as e: +# _logger.error("Schema validation failed") +# raise Exception("File does not conform to {} schema: {}".format(schema, e)) + + +config_general = load_config(CFG_GENERAL_PATH) + +LOG_ROOT = config_general.get("log", "root_folder") -TOR_SOCKSPORT = range(9050, 9150) +try: + if not os.path.exists(LOG_ROOT): + os.makedirs(LOG_ROOT) +except PermissionError: + quit("[Permission Denied] during creation of log file dir: %s" % LOG_ROOT) +except Exception as E: + quit("Error: [%s] - Couldn't create log file dir: %s" % (E, LOG_ROOT)) -ID = 0 # Now ID is deprecated -N = len(RL) +FMT = "[%(asctime)s] [%(levelname)s ] " + \ + "[%(filename)s:%(lineno)d:%(funcName)s()] - %(message)s" +FSIZE = int(config_general.get("log", "max_log_size")) -client = ((IP_LIST[N], basePort+N)) -# KEY DIRECTORY -KD = os.getcwd() + "/keys" -print(KD) +def setup_logger(fname): + _logger = logging.getLogger(fname) + formatter = logging.Formatter(FMT) + log_path = os.path.join(LOG_ROOT, fname) + handler = RotatingFileHandler( + log_path, + maxBytes=FSIZE, + backupCount=1 + ) + handler.setFormatter(formatter) + _logger.root.level = logging.DEBUG + _logger.addHandler(handler) + + # _logger = logging.getLogger("pbftx.config") + print("Storing logs to file: %s" % log_path) + return _logger + + +# main pbft logger +_logger = setup_logger(config_general.get("log", "server_logfile")) + +# client logger +client_logger = setup_logger(config_general.get("log", "client_logfile")) + +config_yaml = load_yaml_config(CFG_YAML_PATH) + +# import pdb; pdb.set_trace() + +network_file_content = open(PEER_NETWORK_FILE, 'r').read().split('\n') +IP_LIST = [l.strip() for l in network_file_content if l] +# total = len(IP_LIST) + +KD = config_general.get("general", "pem_keystore_path") + +basePort = config_yaml["general"]["base_port"] +N = config_yaml['testbed_config']['total'] - 1 + +client_id = config_yaml["testbed_config"]["client_id"] + +# import pdb; pdb.set_trace() + +# replica list +RL = [(l, basePort+i) for i, l in enumerate(IP_LIST[:N])] +# We reserve the last IP as the client +client_address = ((IP_LIST[client_id-1], basePort+client_id-1)) + +threading_enabled = config_yaml["testbed_config"]["threading_enabled"] +# threading_enabled = False diff --git a/trueconsensus/fastchain/pbft-py/ecdsa_sig.py b/trueconsensus/fastchain/ecdsa_sig.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/ecdsa_sig.py rename to trueconsensus/fastchain/ecdsa_sig.py diff --git a/trueconsensus/fastchain/pbft-py/generate_requests_dat.py b/trueconsensus/fastchain/generate_requests_dat.py similarity index 99% rename from trueconsensus/fastchain/pbft-py/generate_requests_dat.py rename to trueconsensus/fastchain/generate_requests_dat.py index 5150706..d2dfef8 100755 --- a/trueconsensus/fastchain/pbft-py/generate_requests_dat.py +++ b/trueconsensus/fastchain/generate_requests_dat.py @@ -64,6 +64,7 @@ def gen_requests(max_requests, batch_size, f): backspace(len(s)) # time.sleep(0.02) + if __name__ == '__main__': max_requests = config_yaml["testbed_config"]["requests"]["max"] batch_size = config_yaml["testbed_config"]["requests"]["batch_size"] diff --git a/trueconsensus/fastchain/local_config.py b/trueconsensus/fastchain/local_config.py index ff8ba07..3aa332e 100644 --- a/trueconsensus/fastchain/local_config.py +++ b/trueconsensus/fastchain/local_config.py @@ -1,25 +1,12 @@ import os -# This Replica ID -ID = 0 -# Number of replicas -N = 4 +# CFG_YAML_PATH = "/etc/pbft_tunables.yaml" +# CFG_YAML_PATH = os.path.join(os.path.expanduser('~'), "pbft_tunables.yaml") +CFG_YAML_PATH = "pbft_tunables.yaml" -# Number of failures we can tolerate -MAX_FAIL = 1 +# CFG_GENERAL_PATH = "/etc/pbft_logistics.cfg" +# os.path.join(os.path.expanduser('~'), "pbft_logistics.cfg") +CFG_GENERAL_PATH = "pbft_logistics.cfg" -# REPLICA LIST -# IP, port -RL = [] -RL.append(("127.0.0.1", 8001)) -RL.append(("127.0.0.1", 8002)) -RL.append(("127.0.0.1", 8003)) -RL.append(("127.0.0.1", 8004)) - -client = (("127.0.0.1", 8101)) - -# KEY DIRECTORY -KD = os.getcwd() + "/keys" -print KD - -# threading_enabled = False +# list of boot nodes +PEER_NETWORK_FILE = os.path.join(os.path.expanduser('~'), "hosts") diff --git a/trueconsensus/fastchain/logging.py b/trueconsensus/fastchain/log_maintainer.py similarity index 100% rename from trueconsensus/fastchain/logging.py rename to trueconsensus/fastchain/log_maintainer.py diff --git a/trueconsensus/fastchain/pbft-py/make_keys.py b/trueconsensus/fastchain/make_keys.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/make_keys.py rename to trueconsensus/fastchain/make_keys.py diff --git a/trueconsensus/fastchain/pbft-py/node.py b/trueconsensus/fastchain/node.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/node.py rename to trueconsensus/fastchain/node.py diff --git a/trueconsensus/fastchain/pbft-py/parse_client_log.py b/trueconsensus/fastchain/parse_client_log.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/parse_client_log.py rename to trueconsensus/fastchain/parse_client_log.py diff --git a/trueconsensus/fastchain/pbft-py/pauto_reader.py b/trueconsensus/fastchain/pauto_reader.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/pauto_reader.py rename to trueconsensus/fastchain/pauto_reader.py diff --git a/trueconsensus/fastchain/pbft-py/.idea/inspectionProfiles/profiles_settings.xml b/trueconsensus/fastchain/pbft-py/.idea/inspectionProfiles/profiles_settings.xml deleted file mode 100644 index c23ecac..0000000 --- a/trueconsensus/fastchain/pbft-py/.idea/inspectionProfiles/profiles_settings.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - \ No newline at end of file diff --git a/trueconsensus/fastchain/pbft-py/.idea/misc.xml b/trueconsensus/fastchain/pbft-py/.idea/misc.xml deleted file mode 100644 index 6db6f5d..0000000 --- a/trueconsensus/fastchain/pbft-py/.idea/misc.xml +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/trueconsensus/fastchain/pbft-py/.idea/modules.xml b/trueconsensus/fastchain/pbft-py/.idea/modules.xml deleted file mode 100644 index 66a28fa..0000000 --- a/trueconsensus/fastchain/pbft-py/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/trueconsensus/fastchain/pbft-py/.idea/pbft.iml b/trueconsensus/fastchain/pbft-py/.idea/pbft.iml deleted file mode 100644 index 6f63a63..0000000 --- a/trueconsensus/fastchain/pbft-py/.idea/pbft.iml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - - - - - - \ No newline at end of file diff --git a/trueconsensus/fastchain/pbft-py/.idea/workspace.xml b/trueconsensus/fastchain/pbft-py/.idea/workspace.xml deleted file mode 100644 index 8094c01..0000000 --- a/trueconsensus/fastchain/pbft-py/.idea/workspace.xml +++ /dev/null @@ -1,546 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Request - k - kyle - cornell - cor - c - crom - croman - - - $PROJECT_DIR$ - - - - - - true - DEFINITION_ORDER - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - project - - - - - - - - - - - - - - - - - - - - - - - - - 1520733756236 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/trueconsensus/fastchain/pbft-py/config.py b/trueconsensus/fastchain/pbft-py/config.py deleted file mode 100644 index 03beee2..0000000 --- a/trueconsensus/fastchain/pbft-py/config.py +++ /dev/null @@ -1,124 +0,0 @@ -#!/bin/env python - -import os -import yaml -import logging -import configparser -# from subprocess import check_output -# from pykwalify import core as pykwalify_core -# from pykwalify import errors as pykwalify_errors -from logging.handlers import RotatingFileHandler - -from local_config import CFG_YAML_PATH, \ - CFG_GENERAL_PATH, \ - PEER_NETWORK_FILE - - -def load_config(path, no_val=False): - """ - general logistics such as log file paths in logistics CFG - """ - if no_val: - config = configparser.ConfigParser(allow_no_value=True) - else: - config = configparser.ConfigParser() - config.read(path) - - return config - - -def load_yaml_config(path, no_val=False): - """ - general tunables specified in YAML - """ - with open(path, "r") as config_file: - pbft_config = yaml.safe_load(config_file) - - _logger.debug("PBFT config {} yaml loaded".format(path)) - - # # Validate base config for Browbeat format - # _validate_yaml("pbft", pbft_config) - # _logger.info("Config {} validated".format(path)) - return pbft_config - -# -# def _validate_yaml(schema, config): -# """Raises exception if config is invalid. -# :param schema: The schema to validate with (pbft, pow, hybrid...) -# :param config: Loaded yaml to validate -# """ -# check = pykwalify_core.Core( -# source_data=config, -# schema_files=["{}/{}.yml".format(conf_schema_path, schema)]) -# try: -# check.validate(raise_exception=True) -# except pykwalify_errors.SchemaError as e: -# _logger.error("Schema validation failed") -# raise Exception("File does not conform to {} schema: {}".format(schema, e)) - - -config_general = load_config(CFG_GENERAL_PATH) - -LOG_ROOT = config_general.get("log", "root_folder") - -try: - if not os.path.exists(LOG_ROOT): - os.makedirs(LOG_ROOT) -except PermissionError: - quit("[Permission Denied] during creation of log file dir: %s" % LOG_ROOT) -except Exception as E: - quit("Error: [%s] - Couldn't create log file dir: %s" % (E, LOG_ROOT)) - -FMT = "[%(asctime)s] [%(levelname)s ] " + \ - "[%(filename)s:%(lineno)d:%(funcName)s()] - %(message)s" -FSIZE = int(config_general.get("log", "max_log_size")) - - -def setup_logger(fname): - _logger = logging.getLogger(fname) - formatter = logging.Formatter(FMT) - log_path = os.path.join(LOG_ROOT, fname) - handler = RotatingFileHandler( - log_path, - maxBytes=FSIZE, - backupCount=1 - ) - handler.setFormatter(formatter) - _logger.root.level = logging.DEBUG - _logger.addHandler(handler) - - # _logger = logging.getLogger("pbftx.config") - print("Storing logs to file: %s" % log_path) - return _logger - - -# main pbft logger -_logger = setup_logger(config_general.get("log", "server_logfile")) - -# client logger -client_logger = setup_logger(config_general.get("log", "client_logfile")) - -config_yaml = load_yaml_config(CFG_YAML_PATH) - -# import pdb; pdb.set_trace() - -network_file_content = open(PEER_NETWORK_FILE, 'r').read().split('\n') -IP_LIST = [l.strip() for l in network_file_content if l] -# total = len(IP_LIST) - -KD = config_general.get("general", "pem_keystore_path") - -basePort = config_yaml["general"]["base_port"] -N = config_yaml['testbed_config']['total'] - 1 - -client_id = config_yaml["testbed_config"]["client_id"] - -# import pdb; pdb.set_trace() - -# replica list -RL = [(l, basePort+i) for i, l in enumerate(IP_LIST[:N])] -# We reserve the last IP as the client -client_address = ((IP_LIST[client_id-1], basePort+client_id-1)) - -threading_enabled = config_yaml["testbed_config"]["threading_enabled"] -# threading_enabled = False diff --git a/trueconsensus/fastchain/pbft-py/local_config.py b/trueconsensus/fastchain/pbft-py/local_config.py deleted file mode 100644 index 3aa332e..0000000 --- a/trueconsensus/fastchain/pbft-py/local_config.py +++ /dev/null @@ -1,12 +0,0 @@ -import os - -# CFG_YAML_PATH = "/etc/pbft_tunables.yaml" -# CFG_YAML_PATH = os.path.join(os.path.expanduser('~'), "pbft_tunables.yaml") -CFG_YAML_PATH = "pbft_tunables.yaml" - -# CFG_GENERAL_PATH = "/etc/pbft_logistics.cfg" -# os.path.join(os.path.expanduser('~'), "pbft_logistics.cfg") -CFG_GENERAL_PATH = "pbft_logistics.cfg" - -# list of boot nodes -PEER_NETWORK_FILE = os.path.join(os.path.expanduser('~'), "hosts") diff --git a/trueconsensus/fastchain/pbft-py/requirements.txt b/trueconsensus/fastchain/pbft-py/requirements.txt deleted file mode 100644 index 3ab7a37..0000000 --- a/trueconsensus/fastchain/pbft-py/requirements.txt +++ /dev/null @@ -1,8 +0,0 @@ -docopt==0.6.2 -ecdsa==0.13 -pycrypto==2.6.1 -pykwalify==1.6.1 -PySocks==1.6.8 -python-dateutil==2.7.3 -PyYAML==3.12 -six==1.11.0 diff --git a/trueconsensus/fastchain/pbft-py/pbft_logistics.cfg b/trueconsensus/fastchain/pbft_logistics.cfg similarity index 100% rename from trueconsensus/fastchain/pbft-py/pbft_logistics.cfg rename to trueconsensus/fastchain/pbft_logistics.cfg diff --git a/trueconsensus/fastchain/pbft-py/pbft_tunables.yaml b/trueconsensus/fastchain/pbft_tunables.yaml similarity index 100% rename from trueconsensus/fastchain/pbft-py/pbft_tunables.yaml rename to trueconsensus/fastchain/pbft_tunables.yaml diff --git a/trueconsensus/fastchain/pbft-py/proto.sh b/trueconsensus/fastchain/proto.sh similarity index 100% rename from trueconsensus/fastchain/pbft-py/proto.sh rename to trueconsensus/fastchain/proto.sh diff --git a/trueconsensus/fastchain/pbft-py/proto_message.py b/trueconsensus/fastchain/proto_message.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/proto_message.py rename to trueconsensus/fastchain/proto_message.py diff --git a/trueconsensus/fastchain/pbft-py/request.proto b/trueconsensus/fastchain/request.proto similarity index 100% rename from trueconsensus/fastchain/pbft-py/request.proto rename to trueconsensus/fastchain/request.proto diff --git a/trueconsensus/fastchain/pbft-py/request_pb2.py b/trueconsensus/fastchain/request_pb2.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/request_pb2.py rename to trueconsensus/fastchain/request_pb2.py diff --git a/trueconsensus/fastchain/pbft-py/run.sh b/trueconsensus/fastchain/run.sh similarity index 100% rename from trueconsensus/fastchain/pbft-py/run.sh rename to trueconsensus/fastchain/run.sh diff --git a/trueconsensus/fastchain/pbft-py/server.py b/trueconsensus/fastchain/server.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/server.py rename to trueconsensus/fastchain/server.py diff --git a/trueconsensus/fastchain/pbft-py/test.py b/trueconsensus/fastchain/test.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/test.py rename to trueconsensus/fastchain/test.py diff --git a/trueconsensus/fastchain/pbft-py/timer.py b/trueconsensus/fastchain/timer.py similarity index 100% rename from trueconsensus/fastchain/pbft-py/timer.py rename to trueconsensus/fastchain/timer.py diff --git a/trueconsensus/multithread_signal_handling.py b/trueconsensus/multithread_signal_handling.py deleted file mode 100644 index 8036706..0000000 --- a/trueconsensus/multithread_signal_handling.py +++ /dev/null @@ -1,44 +0,0 @@ -from threading import Thread -from random import random -import signal -import time -import sys - -stop_requested = False - -def sig_handler(signum, frame): - sys.stdout.write("handling signal: %s\n" % signum) - sys.stdout.flush() - - global stop_requested - stop_requested = True - -def simple_target(a): - print("hello - %s - %s" %(a, random())) - -def run(a): - sys.stdout.write("run started\n") - sys.stdout.flush() - - while not stop_requested: - simple_target(a) - time.sleep(1) - - sys.stdout.write("run exited\n") - sys.stdout.flush() - -signal.signal(signal.SIGTERM, sig_handler) -signal.signal(signal.SIGINT, sig_handler) - -threads = [] - -for i in range(3): - thread = Thread(target=run, args=[i]) - thread.start() - threads.append(thread) - -for thread in threads: - thread.join() - -sys.stdout.write("join completed\n") -sys.stdout.flush() From 5b20780af7c63790c85a45e57e86e35332904a37 Mon Sep 17 00:00:00 2001 From: Archit Sharma Date: Fri, 15 Jun 2018 05:28:44 +0530 Subject: [PATCH 3/5] node.py: enhance exception handling and logging --- trueconsensus/fastchain/bft.py | 77 +++++++++--- trueconsensus/fastchain/ecdsa_sig.py | 30 +++-- trueconsensus/fastchain/log_maintainer.py | 45 ++++--- trueconsensus/fastchain/node.py | 129 +++++++++++++-------- trueconsensus/fastchain/pbft_tunables.yaml | 14 +++ trueconsensus/fastchain/proto_message.py | 11 +- trueconsensus/minerva/main.py | 2 +- trueconsensus/minerva/vrf.py | 16 +++ 8 files changed, 230 insertions(+), 94 deletions(-) create mode 100644 trueconsensus/minerva/vrf.py diff --git a/trueconsensus/fastchain/bft.py b/trueconsensus/fastchain/bft.py index 9c60ebf..ae18e04 100644 --- a/trueconsensus/fastchain/bft.py +++ b/trueconsensus/fastchain/bft.py @@ -19,8 +19,10 @@ import os import uuid import random +import ecdsa_sig as sig from db.backends.level import LevelDB +from fastchain.config import LAMBDA, config_yaml # from logging import ledger from fastchain.node import Node @@ -29,10 +31,6 @@ namedtuple -global LAMBDA -LAMBDA = 0 - - def generate_block(genesis=True): pass @@ -64,6 +62,7 @@ class NodeBFT(Node): LOGs = defaultdict(list) LOG = defaultdict() csize = LAMBDA + state_map = [] # maps states of all nodes def __init__(self, id=None, type=None): self.NodeId = id @@ -80,31 +79,48 @@ class ViewChangeInit(object): ''' def __init__(self): - pass + self.timeout = 300 # seconds? load from config + def check_for_timeout(self, start): + current = time.time() + if current - start >= self.timeout: + return True + return False -class LedgerLog(object): - def __init__(self): - pass class BFTcommittee(object): ''' ''' - def __init__(self): - self.nodes = [] - self.view = 0 + def __init__(self, R, view, node_addresses): + self.committee_id = R + self.view = view + self.nodes = node_addresses + self.log = [] + # TODO: calculate csize and sec_param + sefl.chain_size = R * csize + LAMBDA def call_to_viewchange(self): """ - complains to snailchain + complains to snailchain, init viewchange """ - self.ViewChangeInit(self.nodes) + VC = ViewChangeInit(self.nodes) + response = None + start = time.time() + while true: + response = VC.wait_for_reply() + if + if response is not None: + break + return - def sign_transaction(self): + def sign_transaction(self, txn_tuple): """ """ - pass + key = get_asymm_key(self.id, ktype="sign") + signed_txn = sig.sign(key, txn_tuple) + return signed_txn + def log_to_slowchain(self): """ @@ -115,3 +131,34 @@ def fetch_LOG(self): """ """ pass + + def commit_txn(self): + """ + """ + pass + + def genkey(self): + """ + return (sk, vk) -> keypair generated by ECDSA + """ + return sig.generate_keys() + + def sign_log(self): + key = get_asymm_key(self.id, ktype="sign") + # TODO: handle conversation of log to bytes (use struct?) + signed_log = sig.sign(key, self.log) + return signed_log + + def append_to_log(self, txn_tuple): + self.log.append(txn_tuple) + + def gossip_to_snailchain(self): + """ + use UDP protocol / P2P to gossip to chain + """ + pass + + def fork_vbft(self): + pass + + def update_mempool_subprotocol(self): diff --git a/trueconsensus/fastchain/ecdsa_sig.py b/trueconsensus/fastchain/ecdsa_sig.py index 11847b3..0d10473 100644 --- a/trueconsensus/fastchain/ecdsa_sig.py +++ b/trueconsensus/fastchain/ecdsa_sig.py @@ -63,10 +63,12 @@ def verify_proto_key(key, dig1, message): def get_key_path(i, ktype): try: KEY_NAME = ktype + str(i) + ASYMM_FILE_FORMATS[ktype] - _logger.info("KPATH - FETCH - %s -- %s" % (ktype, KEY_NAME)) + # _logger.info("KPATH - FETCH - %s -- %s" % (ktype, KEY_NAME)) return os.path.join(KD, KEY_NAME) + # generic catch except Exception as E: - quit(E) + _logger.error(E) + return # raise @@ -85,13 +87,23 @@ def write_new_keys(n): def get_asymm_key(i, ktype=None): kpath = get_key_path(i, ktype) - if not os.path.isfile(kpath): - msg = "can't find key file: %s" % kpath - _logger.error(msg) - sys.exit(msg) - key_pem = open(kpath, 'rb').read() - return ASYMM_FUNC_MAP[ktype](key_pem) - + found_error = False + try: + if not os.path.isfile(kpath): + result = "File Not Found: %s" % kpath + _logger.error(result) + found_error = True + else: + key_pem = open(kpath, 'rb').read() + result = ASYMM_FUNC_MAP[ktype](key_pem) + except Exception as result: + found_error = True + + if found_error: + _logger.error("%s" % result) + return + + return result # # def get_verifying_key(i): # kpath = get_key_path(i, "verify") diff --git a/trueconsensus/fastchain/log_maintainer.py b/trueconsensus/fastchain/log_maintainer.py index cf9aaf7..4b2cf87 100644 --- a/trueconsensus/fastchain/log_maintainer.py +++ b/trueconsensus/fastchain/log_maintainer.py @@ -1,25 +1,32 @@ -import logging +from config import _logger -# create logger -logger = logging.getLogger('simple_example') -logger.setLevel(logging.DEBUG) +# import logging -# create console handler and set level to debug -ch = logging.StreamHandler() -ch.setLevel(logging.DEBUG) +# # create logger +# logger = logging.getLogger('simple_example') +# logger.setLevel(logging.DEBUG) -# create formatter -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +# # create console handler and set level to debug +# ch = logging.StreamHandler() +# ch.setLevel(logging.DEBUG) -# add formatter to ch -ch.setFormatter(formatter) +# # create formatter +# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -# add ch to logger -logger.addHandler(ch) +# # add formatter to ch +# ch.setFormatter(formatter) -# # 'application' code -# logger.debug('debug message') -# logger.info('info message') -# logger.warn('warn message') -# logger.error('error message') -# logger.critical('critical message') +# # add ch to logger +# logger.addHandler(ch) + +# # # 'application' code +# # logger.debug('debug message') +# # logger.info('info message') +# # logger.warn('warn message') +# # logger.error('error message') +# # logger.critical('critical message') + + +class LedgerLog(object): + def __init__(self): + pass diff --git a/trueconsensus/fastchain/node.py b/trueconsensus/fastchain/node.py index 7b96775..5cfbd60 100644 --- a/trueconsensus/fastchain/node.py +++ b/trueconsensus/fastchain/node.py @@ -83,7 +83,7 @@ def debug_print_bank(self, signum, flag): # print("-------------") sys.exit() - def __init__(self, id, view, N, max_requests=None): + def __init__(self, id, view, N, committee_addresses=[], max_requests=None): self.max_requests = max_requests self.kill_flag = False # self.ecdsa_key = ecdsa_sig.get_asymm_key(0, "verify") @@ -144,6 +144,7 @@ def __init__(self, id, view, N, max_requests=None): # self.key_dict = {} self.replica_map = {} self.bank = bank.bank(id, 1000) + self.committee_ids = committee_addresses self.request_types = { "REQU": self.process_client_request, @@ -267,6 +268,8 @@ def init_replica_map(self, socket_obj): # type, seq, message, (optional) tag request def create_request(self, req_type, seq, msg, outer_req=None): key = get_asymm_key(self.id, ktype="sign") + if not bool(key): + return # import pdb; pdb.set_trace() msg = bytes(msg, encoding='utf-8') m = message.add_sig(key, self.id, seq, self.view, req_type, msg) @@ -327,6 +330,8 @@ def execute(self, req): t.cancel() key = get_asymm_key(self.id, ktype="sign") + if not bool(key): + return #time.sleep(1) #rc = ecdsa_sig.verify(self.ecdsa_key, self.hello_sig, "Hello world!") #cond.acquire() @@ -385,8 +390,8 @@ def clean(self, fd): def process_init(self, req, fd): if req.inner.id < 0: return None - #TODO: fix this so we can do crash recovery - if not req.inner.id in self.replica_map: + # TODO: fix this so we can do crash recovery + if req.inner.id not in self.replica_map: self.replica_map[req.inner.id] = self.fdmap[fd] else: if req.inner.id > self.id: @@ -401,28 +406,28 @@ def process_checkpoint(self, req, fd): # TODO: requests that reuse timestamps are ignored def in_client_history(self, req): - if not req.inner.id in self.client_message_log: + if req.inner.id not in self.client_message_log: return False - if not req.inner.timestamp in self.client_message_log[req.inner.id]: + if req.inner.timestamp not in self.client_message_log[req.inner.id]: return False return True def add_client_history(self, req): - if not req.inner.id in self.client_message_log: - self.client_message_log[req.inner.id] = {req.inner.timestamp : req} + if req.inner.id not in self.client_message_log: + self.client_message_log[req.inner.id] = {req.inner.timestamp: req} else: self.client_message_log[req.inner.id][req.inner.timestamp] = req - #TODO + # TODO def handle_timeout(self, dig, view): self.lock.acquire() if self.view > view: return print("TIMEOUT TRIGGERED") # Cancel all other timers - client_req,t,fd = self.active[dig] + client_req, t, fd = self.active[dig] for key, value in self.active.iteritems(): - client_req,t,fd = value + client_req, t, fd = value if key != dig: t.cancel() self.view += 1 @@ -434,7 +439,7 @@ def handle_timeout(self, dig, view): for i in self.checkpoint_proof: msg += serialize(i) - #[type][seq][id] -> req + # [type][seq][id] -> req # for each prepared request for sequence, digest in self.prepared.iteritems(): @@ -456,8 +461,8 @@ def handle_timeout(self, dig, view): m = self.create_request("VCHA", self.last_stable_checkpoint, msg) self.broadcast_to_nodes(m) self.process_view_change(m, 0) - #TODO start a time for the view change operation - #TODO set flag to stop processing requests + # TODO start a time for the view change operation + # TODO set flag to stop processing requests def process_client_request(self, req, fd): _logger.info("Phase - PROCESS CLIENT REQ - fd [%s]" % fd) @@ -493,7 +498,7 @@ def process_client_request(self, req, fd): self.add_node_history(m) record_pbft(self.debuglog, m) self.broadcast_to_nodes(m) - #TODO: if client sends to a backup, retransmit to primary + # TODO: if client sends to a backup, retransmit to primary # or not..... maybe better to save bandwidth @@ -506,8 +511,8 @@ def inc_prep_dict(self, digest): def inc_comm_dict(self, digest): if digest in self.comm_dict: self.comm_dict[digest].number += 1 - #self.comm_dict[digest].seq = seq - #self.comm_dict[digest].id_list.append(id) + # self.comm_dict[digest].seq = seq + # self.comm_dict[digest].id_list.append(id) else: self.comm_dict[digest] = req_counter() @@ -550,6 +555,8 @@ def process_preprepare(self, req, fd): client_req.ParseFromString(req.outer) record_pbft(self.debuglog, client_req) client_key = get_asymm_key(client_req.inner.id, ktype="sign") + if not bool(client_key): + return client_req = message.check(client_key, client_req) if client_req == None or req.inner.msg != client_req.dig: _logger.warn("FAILED PRPR OUTER SIGCHECK") @@ -559,8 +566,8 @@ def process_preprepare(self, req, fd): raise else: client_req = None - if not req.inner.msg in self.active: - #self.active[req.inner.msg] = (client_req, Timer(self.timeout, self.handle_timeout), fd) + if req.inner.msg not in self.active: + # self.active[req.inner.msg] = (client_req, Timer(self.timeout, self.handle_timeout), fd) request_timer = Timer(self.timeout, self.handle_timeout, [req.inner.msg, req.inner.view]) request_timer.daemon = True request_timer.start() @@ -614,8 +621,8 @@ def process_commit(self, req, fd): self.execute_in_order(req) - #rc = vprocess_checkpoints(vcheck_list, r.inner.seq) def vprocess_checkpoints(self, vcheck_list, last_checkpoint): + #rc = vprocess_checkpoints(vcheck_list, r.inner.seq) if last_checkpoint == 0: return True if len(vcheck_list) <= 2*self.f: @@ -637,6 +644,8 @@ def vprocess_prepare(self, vprep_dict, vpre_dict, last_checkpoint): return False,0 dig = vpre_dict[k1].inner.msg key = get_asymm_key(vpre_dict[k1].inner.id, ktype="sign") + if not bool(key): + return r = message.check(key, vpre_dict[k1]) if r == None: return False,0 @@ -644,6 +653,8 @@ def vprocess_prepare(self, vprep_dict, vpre_dict, last_checkpoint): for k2,v2 in v1.iteritems(): # check sigs key = get_asymm_key(v2.inner.id, ktype="sign") + if not bool(key): + return r = message.check(key,v2) if r == None: return False,0 @@ -669,7 +680,7 @@ def vprocess_prepare(self, vprep_dict, vpre_dict, last_checkpoint): return True,max def in_view_dict(self,req): - if not req.inner.view in self.view_dict: + if req.inner.view not in self.view_dict: return False for m in self.view_dict[req.inner.view][0]: if m.inner.id == req.inner.id: @@ -701,29 +712,31 @@ def process_view_change(self, req, fd): # for each chkp, prpr, prep message it contains while len(m) > 0: b = m[:4] - size = struct.unpack("!I",b)[0] + size = struct.unpack("!I", b)[0] try: #if True: r2 = request_pb2.Request() r2.ParseFromString(m[4:size+4]) record_pbft(self.debuglog, r2) key = get_asymm_key(r2.inner.id, ktype="sign") - r2 = message.check(key,r2) - if r2 == None: + if not bool(key): + return + r2 = message.check(key, r2) + if r2 is None: print("FAILED SIG CHECK IN VIEW CHANGE") return - except: + except Exception as E: #else: r2 = None - print("FAILED PROTOBUF EXTRACT IN VIEW CHANGE") + print("FAILED PROTOBUF EXTRACT IN VIEW CHANGE: %s" % E) raise return if r2.inner.type == "CHKP": vcheck_list.append(r2) if r2.inner.type == "PREP": - if not r2.inner.seq in vprep_dict: - vprep_dict[r2.inner.seq] = {r2.inner.id : r2} + if r2.inner.seq not in vprep_dict: + vprep_dict[r2.inner.seq] = {r2.inner.id: r2} else: vprep_dict[r2.inner.seq][r2.inner.id] = r2 if r2.inner.type == "PRPR": @@ -777,6 +790,8 @@ def process_view_change(self, req, fd): def nvprocess_prpr(self, prpr_list): for r in prpr_list: key = get_asymm_key(r.inner.id, ktype="sign") + if not bool(key): + return m = message.check(key, r) if m == None: return False @@ -792,6 +807,8 @@ def nvprocess_prpr(self, prpr_list): def nvprocess_view(self, vchange_list): for r in vchange_list: key = get_asymm_key(r.inner.id, ktype="sign") + if not bool(key): + return m = message.check(key, r) if m == None: return False @@ -815,6 +832,8 @@ def process_new_view(self, req, fd): r2.ParseFromString(m[4:size+4]) record_pbft(self.debuglog, r2) key = get_asymm_key(r2.inner.id, ktype="sign") + if not bool(key): + return r2 = message.check(key, r2) if r2 == None: _logger.warn("FAILED SIG CHECK IN NEW VIEW") @@ -850,18 +869,18 @@ def process_new_view(self, req, fd): # [type][seq][id] -> request def add_node_history(self, req): - if not req.inner.type in self.node_message_log: + if req.inner.type not in self.node_message_log: self.node_message_log[req.inner.type] = {req.inner.seq : {req.inner.id : req}} else: - if not req.inner.seq in self.node_message_log[req.inner.type]: + if req.inner.seq not in self.node_message_log[req.inner.type]: self.node_message_log[req.inner.type][req.inner.seq] = {req.inner.id : req} else: self.node_message_log[req.inner.type][req.inner.seq][req.inner.id] = req def in_node_history(self, req): - if not req.inner.type in self.node_message_log: + if req.inner.type not in self.node_message_log: return False - if not req.inner.seq in self.node_message_log[req.inner.type]: + if req.inner.seq not in self.node_message_log[req.inner.type]: return False if req.inner.id in self.node_message_log[req.inner.type][req.inner.seq]: return True @@ -876,17 +895,22 @@ def parse_request(self, request_bytes, fd): req.ParseFromString(request_bytes) record_pbft(self.debuglog, req) key = get_asymm_key(req.inner.id, ktype="sign") + if not bool(key): + return + # if not isinstance(key, ecdsa.SigningKey): + if not bool(key): + return req = message.check(key, req) - if req == None: - _logger.error("FAILED SIG CHECK SOMEWHERE") + if req is None: + _logger.error("Failed message sig check. 'req' is empty..") return - except: + except Exception as E: req = None - _logger.error("ERROR IN PROTOBUF TYPES") - raise # for debug + _logger.error("ERROR IN PROTOBUF TYPES: %s" % E) + # raise # for debug self.clean(fd) return - #print(req.inner.type, len(request_bytes)) + # print(req.inner.type, len(request_bytes)) # TODO: Check for view number and view change, h/H if req.inner.view != self.view or not self.view_active: if req.inner.type != "VCHA" and req.inner.type != "NEVW" and \ @@ -901,9 +925,10 @@ def parse_request(self, request_bytes, fd): return if self.in_node_history(req): _logger.warn("Duplicate node message") - #return + # return pass if req.inner.type in self.request_types and not self.in_client_history(req): + # call to actual success self.request_types[req.inner.type](req, fd) else: self.clean(fd) @@ -925,8 +950,8 @@ def server_loop(self): """ counter = 0 - #self.fdmap[s.fileno] = s - #self.p.register(s, recv_mask) + # self.fdmap[s.fileno] = s + # self.p.register(s, recv_mask) s = self.replica_map[self.id] _logger.debug("------------ INIT SERVER LOOP [ID: %s]-------------" % self.id) t = Timer(5, self.try_client, args=[self.id]) @@ -936,6 +961,8 @@ def server_loop(self): events = self.p.poll() _logger.debug("Polling events queue -> %s" % events) #cstart = time.time() + + # import pdb; pdb.set_trace() for fd, event in events: counter += 1 # need the flag for "Service temporarilly unavailable" exception @@ -970,7 +997,8 @@ def server_loop(self): try: data = self.fdmap[fd].recv(BUF_SIZE) recv_flag = True - except: + except Exception as E: + _logger.debug(E) self.clean(fd) continue #except socket.error, serr: @@ -979,22 +1007,25 @@ def server_loop(self): #self.clean(fd) if not data and recv_flag: try: - _logger.debug("Closing connection from %s " % self.fdmap[fd].getpeername()) - except: - _logger.debug("Closing connection %s " % fd) + peer_address = ":".join(str(i) for i in self.fdmap[fd].getpeername()) + _logger.debug("Closing connection from: event fd [%s] - ID [%s] - Address %s" % (fd, self.id, peer_address)) + except Exception as E: + _logger.debug("Closing connection %s error - %s" % (fd, E)) self.clean(fd) elif recv_flag: - _logger.debug("Chunk Length: %s" % len(data))# .decode('latin-1'))) - self.buffmap[fd] += data#.decode('latin-1') + _logger.debug("Chunk Length: %s" % len(data)) # .decode('latin-1'))) + self.buffmap[fd] += data # .decode('latin-1') while(len(self.buffmap[fd]) > 3): try: size = struct.unpack("!I", self.buffmap[fd][:4])[0] - except: - import pdb; pdb.set_trace() + except Exception as E: + _logger.debug(E) + break + # import pdb; pdb.set_trace() if len(self.buffmap[fd]) >= size+4: self.parse_request(self.buffmap[fd][4:size+4], fd) - if not fd in self.buffmap: + if fd not in self.buffmap: break self.buffmap[fd] = self.buffmap[fd][size+4:] else: diff --git a/trueconsensus/fastchain/pbft_tunables.yaml b/trueconsensus/fastchain/pbft_tunables.yaml index 32e4526..d622510 100644 --- a/trueconsensus/fastchain/pbft_tunables.yaml +++ b/trueconsensus/fastchain/pbft_tunables.yaml @@ -12,3 +12,17 @@ general: max_fail: 1 base_port: 49500 tor_socksport_range: 9050,9150 +# refer to README.md#parameterized-by-following section for these configurables +slowchain: + csize: 0 +bft_committee: + lambda: 1 + timeout: 300 # seconds + tbft: 1 + csize: 0 + th: 10 # adjusted to ⌈csize/3⌉ later + actual_delta: 0 + delta: 1 # ms to be adjusted from network ping test + # https://github.com/matthieu-lapeyre/network-benchmark/blob/master/network_test.py + chain: [] + alpha: 0 # initial adversary hash power diff --git a/trueconsensus/fastchain/proto_message.py b/trueconsensus/fastchain/proto_message.py index ea8a320..3c88972 100644 --- a/trueconsensus/fastchain/proto_message.py +++ b/trueconsensus/fastchain/proto_message.py @@ -7,7 +7,16 @@ from config import _logger -def add_sig(key,id,seq,view,type,message,timestamp = None): +def add_sig(key, id, seq, view, type, message, timestamp=None): + """ + @key + @id + @seq + @view + @type + @message + @timestamp + """ #key = sig.get_signing_key(id) req = request_pb2.Request() inner = req.inner diff --git a/trueconsensus/minerva/main.py b/trueconsensus/minerva/main.py index 8862120..c601b35 100644 --- a/trueconsensus/minerva/main.py +++ b/trueconsensus/minerva/main.py @@ -1,5 +1,5 @@ # WIP -from bft +from trueconsensus.fastchain import bft if __name__ == "__main__": diff --git a/trueconsensus/minerva/vrf.py b/trueconsensus/minerva/vrf.py new file mode 100644 index 0000000..3739622 --- /dev/null +++ b/trueconsensus/minerva/vrf.py @@ -0,0 +1,16 @@ +from fastchain.config import IP_LIST, config_yaml + + +class VariableDayLengthFrequency(object): + ''' + VRF function that chooses a set of nodes from the given IP_LIST pool of addresses + ''' + # TODO: IP_LIST should actually follow the hash address convention + # of ethereum n/w config pool + def __init__(self): + self.snailpool = [] + self.bftpool = [] + self.recentp = [] + self.slow_csize = config_yaml['slowchain']['csize'] + self.fast_csize = config_yaml['bft_committee']['csize'] + From 24b0b55dbc0c9232d912fe496bd070f4de997679 Mon Sep 17 00:00:00 2001 From: Archit Sharma Date: Fri, 15 Jun 2018 07:46:37 +0530 Subject: [PATCH 4/5] reorg and init integration with hybrid consensus --- README.md | 3 + trueconsensus/{fastchain => }/README.md | 15 +- trueconsensus/{fastchain => }/client.py | 4 +- .../{fastchain => conf}/pbft_logistics.cfg | 0 .../{fastchain => conf}/pbft_tunables.yaml | 0 trueconsensus/dapps/__init__.py | 0 trueconsensus/{fastchain => dapps}/bank.py | 4 +- trueconsensus/engine.py | 159 +++++++++++++++++- trueconsensus/fastchain/__init__.py | 3 +- .../fastchain/{bft.py => bft_committee.py} | 27 ++- trueconsensus/fastchain/config.py | 2 +- trueconsensus/fastchain/ecdsa_sig.py | 6 +- trueconsensus/fastchain/local_config.py | 4 +- trueconsensus/fastchain/log_maintainer.py | 2 +- trueconsensus/fastchain/make_keys.py | 10 -- trueconsensus/fastchain/node.py | 55 +++--- trueconsensus/fastchain/proto.sh | 3 - trueconsensus/fastchain/server.py | 151 ----------------- trueconsensus/fastchain/subprotocol.py | 14 +- trueconsensus/proto/__init__.py | 0 .../{fastchain => proto}/proto_message.py | 6 +- .../{fastchain => proto}/request.proto | 0 .../{fastchain => proto}/request_pb2.py | 24 +-- trueconsensus/snailchain/log_maintainer.py | 32 ++++ trueconsensus/snailchain/logging.py | 25 --- trueconsensus/utils/__init__.py | 0 trueconsensus/utils/generate_proto.sh | 17 ++ .../generate_requests_dat.py | 6 +- trueconsensus/utils/make_keys.py | 10 ++ 29 files changed, 314 insertions(+), 268 deletions(-) rename trueconsensus/{fastchain => }/README.md (74%) rename trueconsensus/{fastchain => }/client.py (98%) rename trueconsensus/{fastchain => conf}/pbft_logistics.cfg (100%) rename trueconsensus/{fastchain => conf}/pbft_tunables.yaml (100%) create mode 100644 trueconsensus/dapps/__init__.py rename trueconsensus/{fastchain => dapps}/bank.py (96%) mode change 100644 => 100755 trueconsensus/engine.py rename trueconsensus/fastchain/{bft.py => bft_committee.py} (86%) delete mode 100755 trueconsensus/fastchain/make_keys.py delete mode 100755 trueconsensus/fastchain/proto.sh delete mode 100755 trueconsensus/fastchain/server.py create mode 100644 trueconsensus/proto/__init__.py rename trueconsensus/{fastchain => proto}/proto_message.py (89%) rename trueconsensus/{fastchain => proto}/request.proto (100%) rename trueconsensus/{fastchain => proto}/request_pb2.py (87%) create mode 100644 trueconsensus/snailchain/log_maintainer.py delete mode 100644 trueconsensus/snailchain/logging.py create mode 100644 trueconsensus/utils/__init__.py create mode 100755 trueconsensus/utils/generate_proto.sh rename trueconsensus/{fastchain => utils}/generate_requests_dat.py (95%) create mode 100755 trueconsensus/utils/make_keys.py diff --git a/README.md b/README.md index a599400..c08138b 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,9 @@ Refer to: https://arxiv.org/abs/1805.01457 https://eprint.iacr.org/2016/917.pdf +## Run + +Refer to `trueconsensus/README.md` for further instructions ### Parameterized by following.. diff --git a/trueconsensus/fastchain/README.md b/trueconsensus/README.md similarity index 74% rename from trueconsensus/fastchain/README.md rename to trueconsensus/README.md index eb17f91..2fc5dff 100644 --- a/trueconsensus/fastchain/README.md +++ b/trueconsensus/README.md @@ -6,7 +6,7 @@ This is a `Practical Byzantine Fault Tolerance` implementation of Miguel Castro #### Configure paths and tunables -Fill up the config files `pbft_logistics.cfg` and `pbft_tunables.yaml` or use defaults. +Fill up the config files `conf/pbft_logistics.cfg` and `conf/pbft_tunables.yaml` or use defaults. #### Install dependencies @@ -33,19 +33,22 @@ OR Then proceed as follows: ``` -# generate requests_pb2.py from requests.proto file -./proto.sh +# change to folder trueconsensus +cd trueconsensus/ + +# generate proto/requests_pb2.py from proto/requests.proto file +./utils/generate_proto.sh # generate asymm keypairs -python make_keys.py +python -m utils.make_keys # generate requests -./generate_requests_dat.py +python -m utils.generate_requests_dat ``` ### Run -Server: `./server.py` +Server: `./engine.py` Client: `./client.py` diff --git a/trueconsensus/fastchain/client.py b/trueconsensus/client.py similarity index 98% rename from trueconsensus/fastchain/client.py rename to trueconsensus/client.py index 73464c5..dabe184 100755 --- a/trueconsensus/fastchain/client.py +++ b/trueconsensus/client.py @@ -11,8 +11,8 @@ import socks # import time -import request_pb2 -from config import client_address, \ +from proto import request_pb2 +from fastchain.config import client_address, \ client_id, \ RL, \ client_logger, \ diff --git a/trueconsensus/fastchain/pbft_logistics.cfg b/trueconsensus/conf/pbft_logistics.cfg similarity index 100% rename from trueconsensus/fastchain/pbft_logistics.cfg rename to trueconsensus/conf/pbft_logistics.cfg diff --git a/trueconsensus/fastchain/pbft_tunables.yaml b/trueconsensus/conf/pbft_tunables.yaml similarity index 100% rename from trueconsensus/fastchain/pbft_tunables.yaml rename to trueconsensus/conf/pbft_tunables.yaml diff --git a/trueconsensus/dapps/__init__.py b/trueconsensus/dapps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/trueconsensus/fastchain/bank.py b/trueconsensus/dapps/bank.py similarity index 96% rename from trueconsensus/fastchain/bank.py rename to trueconsensus/dapps/bank.py index bbe5069..b3eeedb 100644 --- a/trueconsensus/fastchain/bank.py +++ b/trueconsensus/dapps/bank.py @@ -1,5 +1,5 @@ -import proto_message as message -from config import config_general +from proto import proto_message as message +from fastchain.config import config_general class bank: diff --git a/trueconsensus/engine.py b/trueconsensus/engine.py old mode 100644 new mode 100755 index bd9e805..59c7967 --- a/trueconsensus/engine.py +++ b/trueconsensus/engine.py @@ -14,19 +14,164 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import sys +# import yaml +import signal +from datetime import datetime +import socket +# from random import random +from argparse import RawTextHelpFormatter, \ + ArgumentParser + +from threading import Timer, Thread + +from fastchain import node +from fastchain.config import config_yaml, \ + threading_enabled, \ + _logger, \ + N, \ + RL + from snailchain import SnailChain -from fastchain.bft import NodeBFT, \ +from fastchain.bft_committee import NodeBFT, \ ViewChangeInit, \ LedgerLog, \ - BFTcommittee, \ - SubProtoDailyBFT, \ + BFTcommittee + +from fastchain.subprotocol import SubProtoDailyBFT, \ Mempools +parser = ArgumentParser(formatter_class=RawTextHelpFormatter, + description="""PBFT standalone server demo""") + + +def suicide(): + # import pdb; pdb.set_trace() + # sys.exit() + # quit() + os.kill(os.getpid(), signal.SIGINT) + + +def signal_handler(event, frame): + sys.stdout.write("handling signal: %s\n" % event) + sys.stdout.flush() + _logger.error("Kill signal (%s) detected. Stopping pbft.." % event) + countdown = 3 # seconds + if event == signal.SIGINT: + print("Committing deliberate suicide in %s seconds" % countdown) + t = Timer(countdown, suicide) + t.start() + sys.exit(130) # Ctrl-C for bash + + +def init_server(id): + global RL + try: + ip, port = RL[id] + except IndexError as E: + quit("%s Ran out of replica list. No more server config to try" % E) + s = socket.socket() + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # host = socket.gethostname() + host = "0.0.0.0" + s.bind((host, port)) # on EC2 we cannot directly bind on public addr + s.listen(50) + s.setblocking(0) + _logger.debug("Server [%s] -- listening on port %s" % (id, port)) + _logger.debug("IP: %s" % ip) + return s + + +class ThreadedExecution(object): -class DailyOffChainConsensus(object): def __init__(self): - self.chain = [] - self._lambda = None + pass + + def run(self, ID): + sys.stdout.write("run started\n") + sys.stdout.flush() + socket_obj = init_server(ID) + n = node.Node(ID, 0, N) + # n.init_keys(N) + n.init_replica_map(socket_obj) + n.server_loop() + sys.stdout.write("run exited\n") + sys.stdout.flush() + + def launch(self): + threads = [] + for i in range(N): + thread = Thread(target=self.run, args=[i]) + thread.start() + threads.append(thread) - def preproess(self): + for thread in threads: + thread.join() + + sys.stdout.write("join completed\n") + sys.stdout.flush() + + +class NonThreadedExecution(object): + ''' + Finds sockets that aren't busy and attempts to establish and launch testbed + ''' + def __init__(self): pass + + def init_server_socket(self, _id=None): + """ + triggers setup using testbed_config. Increments given server id + if that (ip, socket) from Replica List RL is already in use. + """ + global N + c = _id + while c < N: + s = None + try: + s = init_server(c) + except OSError as E: + _logger.error("%s -- Server ID: [%s]" % (E, c)) + c -= 1 + if s: + return s, c + + def launch(self): + socket_obj, _id = self.init_server_socket( + _id=config_yaml["testbed_config"]["server_id_init"] - 1 + ) + n = node.Node(_id, 0, N) + # n.init_keys(N) + n.init_replica_map(socket_obj) + n.server_loop() + + +# def pbft_usage(): +# parser.add_argument("-n", "--nodes", dest="node_count", action='store', +# help="# of PBFT nodes to be launched") +# parser.add_argument("-id", "--node-id", dest="node_id", +# action='store_true', +# help="") +# parser.add_argument("-ts", "--tune-settings", dest="tune", +# action='store_true', +# help="") + + +def main(): + print("Start time: ", datetime.now()) + print("Threading enabled: ", threading_enabled) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # import pdb; pdb.set_trace() + if threading_enabled: + ThreadedExecution().launch() + else: + NonThreadedExecution().launch() + + +if __name__ == "__main__": + # import pdb; pdb.set_trace() + main() diff --git a/trueconsensus/fastchain/__init__.py b/trueconsensus/fastchain/__init__.py index 413fc0f..46c4295 100644 --- a/trueconsensus/fastchain/__init__.py +++ b/trueconsensus/fastchain/__init__.py @@ -2,6 +2,5 @@ Package import structure ''' -from fastchain.bft import * +from fastchain.bft_committee import * from fastchain.node import * - diff --git a/trueconsensus/fastchain/bft.py b/trueconsensus/fastchain/bft_committee.py similarity index 86% rename from trueconsensus/fastchain/bft.py rename to trueconsensus/fastchain/bft_committee.py index ae18e04..ccb12c5 100644 --- a/trueconsensus/fastchain/bft.py +++ b/trueconsensus/fastchain/bft_committee.py @@ -19,16 +19,20 @@ import os import uuid import random -import ecdsa_sig as sig from db.backends.level import LevelDB -from fastchain.config import LAMBDA, config_yaml +from fastchain import ecdsa_sig as sig +from fastchain.log_maintainer import LedgerLog +from fastchain.config import config_yaml # from logging import ledger from fastchain.node import Node + from collections import OrderedDict, \ defaultdict, \ - namedtuple + namedtuple # for transaction tuple, use struct? + +LAMBDA = config_yaml['bft_committee']['lambda'] def generate_block(genesis=True): @@ -48,6 +52,15 @@ def generate_txns(R, l): # return uuid.uuid4().hex +class DailyOffChainConsensus(object): + def __init__(self): + self.chain = [] + self._lambda = None + + def preproess(self): + pass + + class NodeBFT(Node): ''' @types: @@ -67,9 +80,13 @@ class NodeBFT(Node): def __init__(self, id=None, type=None): self.NodeId = id self._type = 'BFTmember' - self.new_row = namedtuple('row', ['R','l','txn']) + self.new_row = namedtuple('row', ['R', 'l', 'txn']) + # TODO: maybe use ctypes.Structure or struct.Struct ? self.nonce = 0 + def launch_boot_nodes(self): + return + def log_to_snailchain(self): return @@ -109,7 +126,6 @@ def call_to_viewchange(self): start = time.time() while true: response = VC.wait_for_reply() - if if response is not None: break return @@ -162,3 +178,4 @@ def fork_vbft(self): pass def update_mempool_subprotocol(self): + pass diff --git a/trueconsensus/fastchain/config.py b/trueconsensus/fastchain/config.py index 03beee2..ca06060 100644 --- a/trueconsensus/fastchain/config.py +++ b/trueconsensus/fastchain/config.py @@ -9,7 +9,7 @@ # from pykwalify import errors as pykwalify_errors from logging.handlers import RotatingFileHandler -from local_config import CFG_YAML_PATH, \ +from fastchain.local_config import CFG_YAML_PATH, \ CFG_GENERAL_PATH, \ PEER_NETWORK_FILE diff --git a/trueconsensus/fastchain/ecdsa_sig.py b/trueconsensus/fastchain/ecdsa_sig.py index 0d10473..120c8da 100644 --- a/trueconsensus/fastchain/ecdsa_sig.py +++ b/trueconsensus/fastchain/ecdsa_sig.py @@ -7,7 +7,7 @@ # import logging # from trueconsensus.fastchain.config import KD -from config import KD, \ +from fastchain.config import KD, \ _logger C = ecdsa.NIST256p @@ -83,7 +83,9 @@ def write_new_keys(n): v_file.write(vk.to_pem()) s_file.close() v_file.close() - + msg = "written new keys to %s" % KD + _logger.info(msg) + return msg def get_asymm_key(i, ktype=None): kpath = get_key_path(i, ktype) diff --git a/trueconsensus/fastchain/local_config.py b/trueconsensus/fastchain/local_config.py index 3aa332e..23db86e 100644 --- a/trueconsensus/fastchain/local_config.py +++ b/trueconsensus/fastchain/local_config.py @@ -2,11 +2,11 @@ # CFG_YAML_PATH = "/etc/pbft_tunables.yaml" # CFG_YAML_PATH = os.path.join(os.path.expanduser('~'), "pbft_tunables.yaml") -CFG_YAML_PATH = "pbft_tunables.yaml" +CFG_YAML_PATH = "conf/pbft_tunables.yaml" # CFG_GENERAL_PATH = "/etc/pbft_logistics.cfg" # os.path.join(os.path.expanduser('~'), "pbft_logistics.cfg") -CFG_GENERAL_PATH = "pbft_logistics.cfg" +CFG_GENERAL_PATH = "conf/pbft_logistics.cfg" # list of boot nodes PEER_NETWORK_FILE = os.path.join(os.path.expanduser('~'), "hosts") diff --git a/trueconsensus/fastchain/log_maintainer.py b/trueconsensus/fastchain/log_maintainer.py index 4b2cf87..f635574 100644 --- a/trueconsensus/fastchain/log_maintainer.py +++ b/trueconsensus/fastchain/log_maintainer.py @@ -1,4 +1,4 @@ -from config import _logger +from fastchain.config import _logger # import logging diff --git a/trueconsensus/fastchain/make_keys.py b/trueconsensus/fastchain/make_keys.py deleted file mode 100755 index bd230a3..0000000 --- a/trueconsensus/fastchain/make_keys.py +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/env python - -# import sys - -import ecdsa_sig as sig -from config import N - - -if __name__ == "__main__": - sig.write_new_keys(N+1) diff --git a/trueconsensus/fastchain/node.py b/trueconsensus/fastchain/node.py index 5cfbd60..5ff188e 100644 --- a/trueconsensus/fastchain/node.py +++ b/trueconsensus/fastchain/node.py @@ -14,11 +14,11 @@ from threading import Timer, Thread, Lock, Condition # import sig -import bank -from ecdsa_sig import get_asymm_key -import request_pb2 -import proto_message as message -from config import config_general, \ +from dapps import bank +from fastchain.ecdsa_sig import get_asymm_key +from proto import request_pb2, \ + proto_message as message +from fastchain.config import config_general, \ RL, \ client_address, \ _logger @@ -65,23 +65,15 @@ def record_pbft(file, request): record(file, msg) -class node: - def debug_print_bank(self, signum, flag): - _logger.info("wait queue length: %s" % len(self.waiting)) - _logger.info("last executed: %s" % self.last_executed) - self.bank.print_balances() - # m = self.create_request("REQU", 0, "replica " + str(self.id) + " going down", 0) - # self.broadcast_to_nodes(m) - # for i,s in self.replica_map.iteritems(): - # try: - # s.send(serialize(m)) - # print("sent to ", i) - # print("-------------") - # except: - # print("could not send to ", i) - # #print(traceback.format_exc()) - # print("-------------") - sys.exit() +class Node(object): + ''' + Main Node responsible for handling following PBFT phases: + - request + - pre-prepare + - prepare + - commit + - reply + ''' def __init__(self, id, view, N, committee_addresses=[], max_requests=None): self.max_requests = max_requests @@ -169,6 +161,23 @@ def __init__(self, id, view, N, committee_addresses=[], max_requests=None): ) self.debuglog = open(replcia_log_loc, 'w', 1) + def debug_print_bank(self, signum, flag): + _logger.info("wait queue length: %s" % len(self.waiting)) + _logger.info("last executed: %s" % self.last_executed) + self.bank.print_balances() + # m = self.create_request("REQU", 0, "replica " + str(self.id) + " going down", 0) + # self.broadcast_to_nodes(m) + # for i,s in self.replica_map.iteritems(): + # try: + # s.send(serialize(m)) + # print("sent to ", i) + # print("-------------") + # except: + # print("could not send to ", i) + # #print(traceback.format_exc()) + # print("-------------") + sys.exit() + def reset_message_log(self): # message log for node communication related to the PBFT protocol: # [type][seq][id] -> request @@ -953,7 +962,7 @@ def server_loop(self): # self.fdmap[s.fileno] = s # self.p.register(s, recv_mask) s = self.replica_map[self.id] - _logger.debug("------------ INIT SERVER LOOP [ID: %s]-------------" % self.id) + _logger.debug("\n%s INIT SERVER LOOP [ID: %s] %s" % ('-'*20, self.id, '-'*20)) t = Timer(5, self.try_client, args=[self.id]) t.start() while True: diff --git a/trueconsensus/fastchain/proto.sh b/trueconsensus/fastchain/proto.sh deleted file mode 100755 index b68ba85..0000000 --- a/trueconsensus/fastchain/proto.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -protoc --python_out=./ request.proto diff --git a/trueconsensus/fastchain/server.py b/trueconsensus/fastchain/server.py deleted file mode 100755 index d961d98..0000000 --- a/trueconsensus/fastchain/server.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/bin/env python - -import os -import sys -import yaml -import signal -import time -import signal -import socket -from random import random -from argparse import RawTextHelpFormatter, \ - ArgumentParser - -from threading import Timer, Thread - -import node -from config import config_yaml, \ - threading_enabled, \ - _logger, \ - N, \ - RL - - -parser = ArgumentParser(formatter_class=RawTextHelpFormatter, - description="""PBFT standalone server demo""") - - -def suicide(): - # import pdb; pdb.set_trace() - # sys.exit() - # quit() - os.kill(os.getpid(), signal.SIGINT) - - -def signal_handler(event, frame): - sys.stdout.write("handling signal: %s\n" % event) - sys.stdout.flush() - _logger.error("Kill signal (%s) detected. Stopping pbft.." % event) - countdown = 3 # seconds - if event == signal.SIGINT: - print("Committing deliberate suicide in %s seconds" % countdown) - t = Timer(countdown, suicide) - t.start() - sys.exit(130) # Ctrl-C for bash - - -def init_server(id): - global RL - try: - ip, port = RL[id] - except IndexError as E: - quit("%s Ran out of replica list. No more server config to try" % E) - s = socket.socket() - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - # host = socket.gethostname() - host = "0.0.0.0" - s.bind((host, port)) # on EC2 we cannot directly bind on public addr - s.listen(50) - s.setblocking(0) - _logger.debug("Server [%s] -- listening on port %s" % (id, port)) - _logger.debug("IP: %s" % ip) - return s - - -class ThreadedExecution(object): - - def __init__(self): - pass - - def run(self, ID): - sys.stdout.write("run started\n") - sys.stdout.flush() - socket_obj = init_server(ID) - n = node.node(ID, 0, N) - # n.init_keys(N) - n.init_replica_map(socket_obj) - n.server_loop() - sys.stdout.write("run exited\n") - sys.stdout.flush() - - def launch(self): - threads = [] - for i in range(N): - thread = Thread(target=self.run, args=[i]) - thread.start() - threads.append(thread) - - for thread in threads: - thread.join() - - sys.stdout.write("join completed\n") - sys.stdout.flush() - - -class NonThreadedExecution(object): - ''' - Finds sockets that aren't busy and attempts to establish and launch testbed - ''' - def __init__(self): - pass - - def init_server_socket(self, _id=None): - """ - triggers setup using testbed_config. Increments given server id - if that (ip, socket) from Replica List RL is already in use. - """ - global N - c = _id - while c < N: - s = None - try: - s = init_server(c) - except OSError as E: - _logger.error("%s -- Server ID: [%s]" % (E, c)) - c -= 1 - if s: - return s, c - - def launch(self): - socket_obj, _id = self.init_server_socket( - _id=config_yaml["testbed_config"]["server_id_init"] - 1 - ) - n = node.node(_id, 0, N) - # n.init_keys(N) - n.init_replica_map(socket_obj) - n.server_loop() - - -# def pbft_usage(): -# parser.add_argument("-n", "--nodes", dest="node_count", action='store', -# help="# of PBFT nodes to be launched") -# parser.add_argument("-id", "--node-id", dest="node_id", -# action='store_true', -# help="") -# parser.add_argument("-ts", "--tune-settings", dest="tune", -# action='store_true', -# help="") - - -if __name__ == "__main__": - # import pdb; pdb.set_trace() - print("Threading enabled: ", threading_enabled) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # import pdb; pdb.set_trace() - if threading_enabled: - ThreadedExecution().launch() - else: - NonThreadedExecution().launch() diff --git a/trueconsensus/fastchain/subprotocol.py b/trueconsensus/fastchain/subprotocol.py index 8017610..22c9941 100644 --- a/trueconsensus/fastchain/subprotocol.py +++ b/trueconsensus/fastchain/subprotocol.py @@ -17,16 +17,14 @@ # import os # import sys import ecdsa - -# from logging import Log -from ecdsa_sig import generate_keys # from collections import defaultdict -from bft import Node +from fastchain.ecdsa_sig import generate_keys +from fastchain.node import Node -C = ecdsa.NIST256p -SIG_SIZE = 64 -HASH_SIZE = 32 +# C = ecdsa.NIST256p +# SIG_SIZE = 64 +# HASH_SIZE = 32 class Mempools(object): @@ -67,7 +65,7 @@ def stop(self): pass def start(self, comm): - + pass def forkVirtualNode(self): BFTpk = Node() diff --git a/trueconsensus/proto/__init__.py b/trueconsensus/proto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/trueconsensus/fastchain/proto_message.py b/trueconsensus/proto/proto_message.py similarity index 89% rename from trueconsensus/fastchain/proto_message.py rename to trueconsensus/proto/proto_message.py index 3c88972..f7dce5d 100644 --- a/trueconsensus/fastchain/proto_message.py +++ b/trueconsensus/proto/proto_message.py @@ -2,9 +2,9 @@ import struct # import sig -import request_pb2 -import ecdsa_sig as sig -from config import _logger +from proto import request_pb2 +from fastchain import ecdsa_sig as sig +from fastchain.config import _logger def add_sig(key, id, seq, view, type, message, timestamp=None): diff --git a/trueconsensus/fastchain/request.proto b/trueconsensus/proto/request.proto similarity index 100% rename from trueconsensus/fastchain/request.proto rename to trueconsensus/proto/request.proto diff --git a/trueconsensus/fastchain/request_pb2.py b/trueconsensus/proto/request_pb2.py similarity index 87% rename from trueconsensus/fastchain/request_pb2.py rename to trueconsensus/proto/request_pb2.py index aa582b1..ecb7435 100644 --- a/trueconsensus/fastchain/request_pb2.py +++ b/trueconsensus/proto/request_pb2.py @@ -1,5 +1,5 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: request.proto +# source: proto/request.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -16,10 +16,10 @@ DESCRIPTOR = _descriptor.FileDescriptor( - name='request.proto', + name='proto/request.proto', package='request', syntax='proto2', - serialized_pb=_b('\n\rrequest.proto\x12\x07request\"\xb7\x01\n\x07Request\x12%\n\x05inner\x18\x01 \x02(\x0b\x32\x16.request.Request.Inner\x12\x0b\n\x03\x64ig\x18\x02 \x02(\x0c\x12\x0b\n\x03sig\x18\x03 \x02(\x0c\x12\r\n\x05outer\x18\x04 \x01(\x0c\x1a\\\n\x05Inner\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0b\n\x03seq\x18\x02 \x02(\x05\x12\x0c\n\x04view\x18\x03 \x02(\x05\x12\x0c\n\x04type\x18\x04 \x02(\t\x12\x0b\n\x03msg\x18\x05 \x02(\x0c\x12\x11\n\ttimestamp\x18\x06 \x01(\x05\"(\n\x07History\x12\x1d\n\x03req\x18\x01 \x03(\x0b\x32\x10.request.Request') + serialized_pb=_b('\n\x13proto/request.proto\x12\x07request\"\xb7\x01\n\x07Request\x12%\n\x05inner\x18\x01 \x02(\x0b\x32\x16.request.Request.Inner\x12\x0b\n\x03\x64ig\x18\x02 \x02(\x0c\x12\x0b\n\x03sig\x18\x03 \x02(\x0c\x12\r\n\x05outer\x18\x04 \x01(\x0c\x1a\\\n\x05Inner\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0b\n\x03seq\x18\x02 \x02(\x05\x12\x0c\n\x04view\x18\x03 \x02(\x05\x12\x0c\n\x04type\x18\x04 \x02(\t\x12\x0b\n\x03msg\x18\x05 \x02(\x0c\x12\x11\n\ttimestamp\x18\x06 \x01(\x05\"(\n\x07History\x12\x1d\n\x03req\x18\x01 \x03(\x0b\x32\x10.request.Request') ) @@ -86,8 +86,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=118, - serialized_end=210, + serialized_start=124, + serialized_end=216, ) _REQUEST = _descriptor.Descriptor( @@ -137,8 +137,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=27, - serialized_end=210, + serialized_start=33, + serialized_end=216, ) @@ -168,8 +168,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=212, - serialized_end=252, + serialized_start=218, + serialized_end=258, ) _REQUEST_INNER.containing_type = _REQUEST @@ -183,12 +183,12 @@ Inner = _reflection.GeneratedProtocolMessageType('Inner', (_message.Message,), dict( DESCRIPTOR = _REQUEST_INNER, - __module__ = 'request_pb2' + __module__ = 'proto.request_pb2' # @@protoc_insertion_point(class_scope:request.Request.Inner) )) , DESCRIPTOR = _REQUEST, - __module__ = 'request_pb2' + __module__ = 'proto.request_pb2' # @@protoc_insertion_point(class_scope:request.Request) )) _sym_db.RegisterMessage(Request) @@ -196,7 +196,7 @@ History = _reflection.GeneratedProtocolMessageType('History', (_message.Message,), dict( DESCRIPTOR = _HISTORY, - __module__ = 'request_pb2' + __module__ = 'proto.request_pb2' # @@protoc_insertion_point(class_scope:request.History) )) _sym_db.RegisterMessage(History) diff --git a/trueconsensus/snailchain/log_maintainer.py b/trueconsensus/snailchain/log_maintainer.py new file mode 100644 index 0000000..4b2cf87 --- /dev/null +++ b/trueconsensus/snailchain/log_maintainer.py @@ -0,0 +1,32 @@ +from config import _logger + +# import logging + +# # create logger +# logger = logging.getLogger('simple_example') +# logger.setLevel(logging.DEBUG) + +# # create console handler and set level to debug +# ch = logging.StreamHandler() +# ch.setLevel(logging.DEBUG) + +# # create formatter +# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + +# # add formatter to ch +# ch.setFormatter(formatter) + +# # add ch to logger +# logger.addHandler(ch) + +# # # 'application' code +# # logger.debug('debug message') +# # logger.info('info message') +# # logger.warn('warn message') +# # logger.error('error message') +# # logger.critical('critical message') + + +class LedgerLog(object): + def __init__(self): + pass diff --git a/trueconsensus/snailchain/logging.py b/trueconsensus/snailchain/logging.py deleted file mode 100644 index cf9aaf7..0000000 --- a/trueconsensus/snailchain/logging.py +++ /dev/null @@ -1,25 +0,0 @@ -import logging - -# create logger -logger = logging.getLogger('simple_example') -logger.setLevel(logging.DEBUG) - -# create console handler and set level to debug -ch = logging.StreamHandler() -ch.setLevel(logging.DEBUG) - -# create formatter -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - -# add formatter to ch -ch.setFormatter(formatter) - -# add ch to logger -logger.addHandler(ch) - -# # 'application' code -# logger.debug('debug message') -# logger.info('info message') -# logger.warn('warn message') -# logger.error('error message') -# logger.critical('critical message') diff --git a/trueconsensus/utils/__init__.py b/trueconsensus/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/trueconsensus/utils/generate_proto.sh b/trueconsensus/utils/generate_proto.sh new file mode 100755 index 0000000..e8072e6 --- /dev/null +++ b/trueconsensus/utils/generate_proto.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +set -e + +PROTO_ROOT=proto/ + +if [[ -d $PROTO_ROOT ]]; then + if [[ -f ${PROTO_ROOT%/}/request.proto ]]; then + protoc --python_out=$PWD proto/request.proto + else + echo "File [${PROTO_ROOT%/}/request.proto] not found!" + + echo "Generated ${PROTO_ROOT%/}/request_pb2.py from request.proto" + fi +else + echo "Directory $PWD/proto not found!" +fi diff --git a/trueconsensus/fastchain/generate_requests_dat.py b/trueconsensus/utils/generate_requests_dat.py similarity index 95% rename from trueconsensus/fastchain/generate_requests_dat.py rename to trueconsensus/utils/generate_requests_dat.py index d2dfef8..19f3e1a 100755 --- a/trueconsensus/fastchain/generate_requests_dat.py +++ b/trueconsensus/utils/generate_requests_dat.py @@ -7,13 +7,13 @@ import struct # import sig -import ecdsa_sig -from config import client_id, \ +from fastchain import ecdsa_sig +from fastchain.config import client_id, \ config_yaml, \ config_general, \ N # import request_pb2 -import proto_message as message +import proto.proto_message as message def backspace(n): diff --git a/trueconsensus/utils/make_keys.py b/trueconsensus/utils/make_keys.py new file mode 100755 index 0000000..9e315c3 --- /dev/null +++ b/trueconsensus/utils/make_keys.py @@ -0,0 +1,10 @@ +#!/bin/env python + +# import sys + +from fastchain import ecdsa_sig as sig +from fastchain.config import N + + +if __name__ == "__main__": + print(sig.write_new_keys(N+1)) From 8200dcbeb06988a08d5ed0279acb62b544984341 Mon Sep 17 00:00:00 2001 From: Archit Sharma Date: Fri, 15 Jun 2018 07:54:43 +0530 Subject: [PATCH 5/5] add docs/TODO.md --- {doc => docs}/DEV.md | 0 docs/TODO.md | 20 ++++++++++++++++++++ 2 files changed, 20 insertions(+) rename {doc => docs}/DEV.md (100%) create mode 100644 docs/TODO.md diff --git a/doc/DEV.md b/docs/DEV.md similarity index 100% rename from doc/DEV.md rename to docs/DEV.md diff --git a/docs/TODO.md b/docs/TODO.md new file mode 100644 index 0000000..c0536e7 --- /dev/null +++ b/docs/TODO.md @@ -0,0 +1,20 @@ +## Generic Infra + +- [ ] Separate Dapp architecture: `dapps/bank.py` is currently tightly coupled with node.py. +- [ ] no gRPC communiction between nodes yet +- [ ] add network_latency check + +## Fastchain + +- [ ] nodes call quits on any error +- [ ] xyz.pem not found error on running client.py. Batch size not playing well in proto_message.message check +- [ ] complete subprotocol.py +- [ ] complete bft_committee.py +- [ ] return blocks of transactions + +## Snailchain / Committee election + +- [ ] complete minerva/vrf.py function +- [ ] complete snailchain/fpow.py dummy +- [ ] integrate level db wrappers +- [ ] integrate py-evm functionalities