Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

BroadcastProtocol introduction #5109

Merged
merged 16 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from typing import (
Any,
Dict,
Hashable,
Iterable,
List,
Optional,
Expand Down Expand Up @@ -61,6 +60,7 @@
from golem.monitor.model.nodemetadatamodel import NodeMetadataModel
from golem.monitor.monitor import SystemMonitor
from golem.monitorconfig import MONITOR_CONFIG
from golem.network import broadcast
from golem.network import nodeskeeper
from golem.network.concent.client import ConcentClientService
from golem.network.concent.filetransfers import ConcentFiletransferService
Expand Down Expand Up @@ -263,6 +263,7 @@ def get_wamp_rpc_mapping(self):
from golem.environments.minperformancemultiplier import \
MinPerformanceMultiplier
from golem.network.concent import soft_switch as concent_soft_switch
from golem.rpc.api import broadcast_ as api_broadcast
from golem.rpc.api import ethereum_ as api_ethereum
from golem.task import rpc as task_rpc
from golem.apps import rpc as apps_rpc
Expand All @@ -282,6 +283,7 @@ def get_wamp_rpc_mapping(self):
task_rpc_provider,
app_rpc_provider,
api_ethereum.ETSProvider(self.transaction_system),
api_broadcast,
)
mapping = {}
for rpc_provider in providers:
Expand Down Expand Up @@ -1743,6 +1745,7 @@ def _run(self) -> None:
jobs = (
nodeskeeper.sweep,
msg_queue.sweep,
broadcast.sweep,
lambda: logger.info(
"Time marker. time(): %s now(): %s, utcnow(): %s, delta: %s",
time.time(),
Expand Down
3 changes: 3 additions & 0 deletions golem/core/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
DEFAULT_CONNECT_TO = '8.8.8.8'
DEFAULT_CONNECT_TO_PORT = 80

# XXX FIXME Update before relase
BROADCAST_PUBKEY = b'\xb7\xdap\xa8\xbb\xb49\xe8\xf1\xcd\xf7IL\xe1c)J\x88L\xca\xf9\xf1\x17\x02><\xad^]L\xb6\x06U\xae\xc6\x97\xc8Y\xfd\xeb\x98\x80\xef\x94\xe3p^\xe0\xa2\xddD\xeb\xa7\xd6\x8c\xab\xcd\x90\xe7\x97+H\xd0\x0f' # noqa pylint: disable=line-too-long

CONCENT_CERTIFICATES_DIR = pathlib.Path(common.get_golem_path()) \
/ 'golem/network/concent/resources/ssl/certs'
CONCENT_CHOICES: dict = {
Expand Down
2 changes: 1 addition & 1 deletion golem/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def execute_sql(self, sql, params=None, require_commit=True):


class Database:
SCHEMA_VERSION = 47
SCHEMA_VERSION = 48

def __init__(self, # noqa pylint: disable=too-many-arguments
db: peewee.Database,
Expand Down
25 changes: 25 additions & 0 deletions golem/database/schemas/048_create_broadcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# pylint: disable=no-member
# pylint: disable=unused-argument
import datetime

import peewee as pw

SCHEMA_VERSION = 48


def migrate(migrator, database, fake=False, **kwargs):
@migrator.create_model # pylint: disable=unused-variable
class Broadcast(pw.Model):
timestamp = pw.IntegerField()
broadcast_type = pw.IntegerField()
signature = pw.BlobField()
data = pw.BlobField()
created_date = pw.DateTimeField(default=datetime.datetime.now)
modified_date = pw.DateTimeField(default=datetime.datetime.now)

class Meta:
db_table = "broadcast"


def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_model("broadcast")
104 changes: 103 additions & 1 deletion golem/model.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import datetime
import enum
import hashlib
import inspect
import json
import pickle
import struct
import sys
import time
from typing import Any, Dict, Optional

from eth_utils import decode_hex, encode_hex
from ethereum.utils import denoms
import golem_messages
from golem_messages import cryptography
from golem_messages import datastructures as msg_dt
from golem_messages import message
from golem_messages.datastructures import p2p as dt_p2p, masking
Expand Down Expand Up @@ -199,7 +202,7 @@ def python_value(self, value):
class EnumField(EnumFieldBase, IntegerField):
""" Database field that maps enum type to integer."""

def __init__(self, enum_type, *args, **kwargs):
def __init__(self, *args, enum_type=None, **kwargs):
super(EnumField, self).__init__(*args, **kwargs)
self.enum_type = enum_type

Expand Down Expand Up @@ -877,6 +880,105 @@ def __repr__(self):
)


class Broadcast(BaseModel):
class TYPE(enum.IntEnum):
Version = enum.auto()
HEADER_FORMAT = '!HQ'
HEADER_LENGTH = struct.calcsize(HEADER_FORMAT)
SIGNATURE_LENGTH = 65
timestamp = IntegerField()
broadcast_type = EnumField(enum_type=TYPE)
signature = BlobField()
data = BlobField()

def __repr__(self):
return (
f"<{self.__class__.__qualname__}"
f" {self.timestamp} {self.broadcast_type.name}>"
)

@classmethod
def create_and_sign(cls, private_key, broadcast_type, data) -> 'Broadcast':
bc = cls()
bc.timestamp = int(time.time())
bc.broadcast_type = broadcast_type
bc.data = data
bc.sign(private_key=private_key)
bc.save(force_insert=True)
return bc

def process(self) -> bool:
if Broadcast.select().where(
Broadcast.broadcast_type == self.broadcast_type,
Broadcast.timestamp == self.timestamp,
).exists():
return False
if self.broadcast_type is self.TYPE.Version:
from golem.network.p2p.peersession import compare_version
compare_version(self.data.decode('utf-8', 'replace'))
self.save(force_insert=True)
return True

def header_to_bytes(self) -> bytes:
return struct.pack(
self.HEADER_FORMAT,
self.broadcast_type,
self.timestamp,
)

def header_from_bytes(self, b: bytes) -> None:
try:
broadcast_type, self.timestamp = struct.unpack(
self.HEADER_FORMAT,
b,
)
self.broadcast_type = self.TYPE(broadcast_type) # type: ignore
except (ValueError, struct.error):
from golem.network import broadcast
raise broadcast.BroadcastError('Invalid header')

@classmethod
def from_bytes(cls, b: bytes) -> 'Broadcast':
# Remember to verify signature
jiivan marked this conversation as resolved.
Show resolved Hide resolved
if len(b) < cls.HEADER_LENGTH + cls.SIGNATURE_LENGTH:
from golem.network import broadcast
raise broadcast.BroadcastError(
'Invalid broadcast: too short'
f' ({len(b)} < {cls.HEADER_LENGTH + cls.SIGNATURE_LENGTH})',
)
bc = cls()
bc.header_from_bytes(b[:cls.HEADER_LENGTH])
bc.signature = b[
cls.HEADER_LENGTH:cls.HEADER_LENGTH+cls.SIGNATURE_LENGTH
]
bc.data = b[cls.HEADER_LENGTH+cls.SIGNATURE_LENGTH:]
return bc

def to_bytes(self) -> bytes:
return self.header_to_bytes() + self.signature + self.data

def get_hash(self) -> bytes:
sha = hashlib.sha1()
sha.update(self.header_to_bytes())
sha.update(self.data)
return sha.digest()

def sign(self, private_key: bytes) -> None:
assert self.signature is None
self.signature = cryptography.ecdsa_sign(
privkey=private_key,
msghash=self.get_hash(),
)

def verify_signature(self, public_key: bytes) -> None:
# XXX MultiSig solution? Like two of three?
jiivan marked this conversation as resolved.
Show resolved Hide resolved
cryptography.ecdsa_verify(
pubkey=public_key,
signature=self.signature,
message=self.get_hash(),
)


def collect_db_models(module: str = __name__):
return inspect.getmembers(
sys.modules[module],
Expand Down
76 changes: 76 additions & 0 deletions golem/network/broadcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
import typing

import peewee

from golem import decorators
from golem import model
from golem.core import variables
from golem.core.databuffer import DataBuffer


logger = logging.getLogger(__name__)


class BroadcastError(Exception):
pass


class BroadcastList(list):
@classmethod
def from_bytes(cls, b: bytes) -> typing.List[model.Broadcast]:
jiivan marked this conversation as resolved.
Show resolved Hide resolved
db = DataBuffer()
db.append_bytes(b)
result = []
for cnt, broadcast_binary in enumerate(db.get_len_prefixed_bytes()):
if cnt >= 10:
break
try:
b = model.Broadcast.from_bytes(broadcast_binary)
b.verify_signature(public_key=variables.BROADCAST_PUBKEY)
result.append(b)
except BroadcastError as e:
logger.debug(
'Invalid broadcast received: %s. b=%r',
e,
broadcast_binary,
)
except Exception: # pylint: disable=broad-except
logger.debug(
'Invalid broadcast received: %r',
broadcast_binary,
exc_info=True,
)
return result

def to_bytes(self) -> bytes:
db = DataBuffer()
for broadcast in self:
assert isinstance(broadcast, model.Broadcast)
db.append_len_prefixed_bytes(broadcast.to_bytes())
return db.read_all()


def prepare_handshake() -> BroadcastList:
query = model.Broadcast.select().where(
model.Broadcast.broadcast_type == model.Broadcast.TYPE.Version,
)
bl = BroadcastList()
if query.exists():
bl.append(query.order_by('-timestamp')[0])
jiivan marked this conversation as resolved.
Show resolved Hide resolved
logger.debug('Prepared handshake: %s', bl)
return bl


@decorators.run_with_db()
def sweep() -> None:
logger.info('Sweeping broadcasts')
jiivan marked this conversation as resolved.
Show resolved Hide resolved
max_timestamp = model.Broadcast.select(
peewee.fn.MAX(model.Broadcast.timestamp),
).scalar()
count = model.Broadcast.delete().where(
model.Broadcast.broadcast_type == model.Broadcast.TYPE.Version,
model.Broadcast.timestamp < max_timestamp,
).execute()
if count:
logger.info('Sweeped broadcasts. count=%d', count)
2 changes: 1 addition & 1 deletion golem/network/p2p/p2pservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
"""
network = tcpnetwork.TCPNetwork(
ProtocolFactory(
tcpnetwork.SafeProtocol,
tcpnetwork.BroadcastProtocol,
self,
SessionFactory(PeerSession)
),
Expand Down
5 changes: 0 additions & 5 deletions golem/network/p2p/peersession.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ def _react_to_hello(self, msg):
logger.error("Received unexpected Hello message, ignoring")
return

# Check if sender is a seed/bootstrap node
port = getattr(msg, 'port', None)
if (self.address, port) in self.p2p_service.seeds:
compare_version(getattr(msg, 'client_ver', None))

if not self.conn.opened:
return

Expand Down
Loading