Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

BroadcastProtocol introduction #5109

Merged
merged 16 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from typing import (
Any,
Dict,
Hashable,
Iterable,
List,
Optional,
Expand Down Expand Up @@ -61,6 +60,7 @@
from golem.monitor.model.nodemetadatamodel import NodeMetadataModel
from golem.monitor.monitor import SystemMonitor
from golem.monitorconfig import MONITOR_CONFIG
from golem.network import broadcast
from golem.network import nodeskeeper
from golem.network.concent.client import ConcentClientService
from golem.network.concent.filetransfers import ConcentFiletransferService
Expand Down Expand Up @@ -263,6 +263,7 @@ def get_wamp_rpc_mapping(self):
from golem.environments.minperformancemultiplier import \
MinPerformanceMultiplier
from golem.network.concent import soft_switch as concent_soft_switch
from golem.rpc.api import broadcast_ as api_broadcast
from golem.rpc.api import ethereum_ as api_ethereum
from golem.task import rpc as task_rpc
from golem.apps import rpc as apps_rpc
Expand All @@ -282,6 +283,7 @@ def get_wamp_rpc_mapping(self):
task_rpc_provider,
app_rpc_provider,
api_ethereum.ETSProvider(self.transaction_system),
api_broadcast,
)
mapping = {}
for rpc_provider in providers:
Expand Down Expand Up @@ -1743,6 +1745,7 @@ def _run(self) -> None:
jobs = (
nodeskeeper.sweep,
msg_queue.sweep,
broadcast.sweep,
lambda: logger.info(
"Time marker. time(): %s now(): %s, utcnow(): %s, delta: %s",
time.time(),
Expand Down
2 changes: 2 additions & 0 deletions golem/config/environments/mainnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(self):

# P2P

BROADCAST_PUBKEY = b'\xab\xab;\xb0\x89\x10\r\xf8Hs\xd7\x91\xcc\x13\xdb\x0b9tw\x80\xd4t?\xdc\x9dS.\x9at\xe3X\xbcBK\x1c\xef\xdb3\xab}z\xad\xde"ZW\xa9T\xdeN\xb6\xc7P\x0e\xa9\x7fv\x1a\xec\xcbN\x07R\x10' # noqa pylint: disable=line-too-long

P2P_SEEDS = [
('seeds.golem.network', 40102),
('0.seeds.golem.network', 40102),
Expand Down
2 changes: 2 additions & 0 deletions golem/config/environments/testnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(self):

# P2P

BROADCAST_PUBKEY = b'\xbe\x0e\xb0@\xad\xad~\xd7\xe3\xca\x96*k\x7f\x0b*\x96++\xb0{\x95+n~\xfdF\xc8\x88\xff\x06\x93cr\xb3\xcb@\xc8Y\xd5n\x98|\xec\x90$\xf2E\xf9\xbbyh:\x99"\xaf\xa2-\xc9os:\xb6\x88' # noqa pylint: disable=line-too-long

P2P_SEEDS = [
('94.23.57.58', 40102),
('94.23.57.58', 40104),
Expand Down
2 changes: 1 addition & 1 deletion golem/core/databuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def read_len_prefixed_bytes(self):
"""
ret_bytes = None

if (self.data_size() > LONG_STANDARD_SIZE and
if (self.data_size() >= LONG_STANDARD_SIZE and
self.data_size() >= (self.peek_ulong() + LONG_STANDARD_SIZE)):
num_bytes = self.read_ulong()
ret_bytes = self.read_bytes(num_bytes)
Expand Down
2 changes: 1 addition & 1 deletion golem/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def execute_sql(self, sql, params=None, require_commit=True):


class Database:
SCHEMA_VERSION = 47
SCHEMA_VERSION = 48

def __init__(self, # noqa pylint: disable=too-many-arguments
db: peewee.Database,
Expand Down
25 changes: 25 additions & 0 deletions golem/database/schemas/048_create_broadcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# pylint: disable=no-member
# pylint: disable=unused-argument
import datetime

import peewee as pw

SCHEMA_VERSION = 48


def migrate(migrator, database, fake=False, **kwargs):
@migrator.create_model # pylint: disable=unused-variable
class Broadcast(pw.Model):
timestamp = pw.IntegerField()
broadcast_type = pw.IntegerField()
signature = pw.BlobField()
data = pw.BlobField()
created_date = pw.DateTimeField(default=datetime.datetime.now)
modified_date = pw.DateTimeField(default=datetime.datetime.now)

class Meta:
db_table = "broadcast"


def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_model("broadcast")
104 changes: 103 additions & 1 deletion golem/model.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import datetime
import enum
import hashlib
import inspect
import json
import pickle
import struct
import sys
import time
from typing import Any, Dict, Optional

from eth_utils import decode_hex, encode_hex
from ethereum.utils import denoms
import golem_messages
from golem_messages import cryptography
from golem_messages import datastructures as msg_dt
from golem_messages import message
from golem_messages.datastructures import p2p as dt_p2p, masking
Expand Down Expand Up @@ -199,7 +202,7 @@ def python_value(self, value):
class EnumField(EnumFieldBase, IntegerField):
""" Database field that maps enum type to integer."""

def __init__(self, enum_type, *args, **kwargs):
def __init__(self, *args, enum_type=None, **kwargs):
super(EnumField, self).__init__(*args, **kwargs)
self.enum_type = enum_type

Expand Down Expand Up @@ -877,6 +880,105 @@ def __repr__(self):
)


class Broadcast(BaseModel):
class TYPE(enum.IntEnum):
Version = enum.auto()
HEADER_FORMAT = '!HQ'
HEADER_LENGTH = struct.calcsize(HEADER_FORMAT)
SIGNATURE_LENGTH = 65
timestamp = IntegerField()
broadcast_type = EnumField(enum_type=TYPE)
signature = BlobField()
data = BlobField()

def __repr__(self):
return (
f"<{self.__class__.__qualname__}"
f" {self.timestamp} {self.broadcast_type.name}>"
)

@classmethod
def create_and_sign(cls, private_key, broadcast_type, data) -> 'Broadcast':
bc = cls()
bc.timestamp = int(time.time())
bc.broadcast_type = broadcast_type
bc.data = data
bc.sign(private_key=private_key)
bc.save(force_insert=True)
return bc

def process(self) -> bool:
if Broadcast.select().where(
Broadcast.broadcast_type == self.broadcast_type,
Broadcast.timestamp == self.timestamp,
).exists():
return False
if self.broadcast_type is self.TYPE.Version:
from golem.network.p2p.peersession import compare_version
compare_version(self.data.decode('utf-8', 'replace'))
self.save(force_insert=True)
return True

def header_to_bytes(self) -> bytes:
return struct.pack(
self.HEADER_FORMAT,
self.broadcast_type,
self.timestamp,
)

def header_from_bytes(self, b: bytes) -> None:
try:
broadcast_type, self.timestamp = struct.unpack(
self.HEADER_FORMAT,
b,
)
self.broadcast_type = self.TYPE(broadcast_type) # type: ignore
except (ValueError, struct.error):
from golem.network import broadcast
raise broadcast.BroadcastError('Invalid header')

@classmethod
def from_bytes(cls, b: bytes) -> 'Broadcast':
# Remember to verify signature of this broadcast if it's been loaded
# from untrusted source
if len(b) < cls.HEADER_LENGTH + cls.SIGNATURE_LENGTH:
from golem.network import broadcast
raise broadcast.BroadcastError(
'Invalid broadcast: too short'
f' ({len(b)} < {cls.HEADER_LENGTH + cls.SIGNATURE_LENGTH})',
)
bc = cls()
bc.header_from_bytes(b[:cls.HEADER_LENGTH])
bc.signature = b[
cls.HEADER_LENGTH:cls.HEADER_LENGTH+cls.SIGNATURE_LENGTH
]
bc.data = b[cls.HEADER_LENGTH+cls.SIGNATURE_LENGTH:]
return bc

def to_bytes(self) -> bytes:
return self.header_to_bytes() + self.signature + self.data

def get_hash(self) -> bytes:
sha = hashlib.sha1()
sha.update(self.header_to_bytes())
sha.update(self.data)
return sha.digest()

def sign(self, private_key: bytes) -> None:
assert self.signature is None
self.signature = cryptography.ecdsa_sign(
privkey=private_key,
msghash=self.get_hash(),
)

def verify_signature(self, public_key: bytes) -> None:
cryptography.ecdsa_verify(
pubkey=public_key,
signature=self.signature,
message=self.get_hash(),
)


def collect_db_models(module: str = __name__):
return inspect.getmembers(
sys.modules[module],
Expand Down
74 changes: 74 additions & 0 deletions golem/network/broadcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import typing

import peewee

from golem import decorators
from golem import model
from golem.config import active
from golem.core.databuffer import DataBuffer


logger = logging.getLogger(__name__)


class BroadcastError(Exception):
pass


def list_from_bytes(b: bytes) -> typing.List[model.Broadcast]:
db = DataBuffer()
db.append_bytes(b)
result = []
for cnt, broadcast_binary in enumerate(db.get_len_prefixed_bytes()):
if cnt >= 10:
break
try:
b = model.Broadcast.from_bytes(broadcast_binary)
b.verify_signature(public_key=active.BROADCAST_PUBKEY)
result.append(b)
except BroadcastError as e:
logger.debug(
'Invalid broadcast received: %s. b=%r',
e,
broadcast_binary,
)
except Exception: # pylint: disable=broad-except
logger.debug(
'Invalid broadcast received: %r',
broadcast_binary,
exc_info=True,
)
return result


def list_to_bytes(l: typing.List[model.Broadcast]) -> bytes:
db = DataBuffer()
for broadcast in l:
assert isinstance(broadcast, model.Broadcast)
db.append_len_prefixed_bytes(broadcast.to_bytes())
return db.read_all()


def prepare_handshake() -> typing.List[model.Broadcast]:
query = model.Broadcast.select().where(
model.Broadcast.broadcast_type == model.Broadcast.TYPE.Version,
)
bl = []
if query.exists():
bl.append(query.order_by('-timestamp')[0])
jiivan marked this conversation as resolved.
Show resolved Hide resolved
logger.debug('Prepared handshake: %s', bl)
return bl


@decorators.run_with_db()
def sweep() -> None:
max_timestamp = model.Broadcast.select(
peewee.fn.MAX(model.Broadcast.timestamp),
).scalar()
count = model.Broadcast.delete().where(
model.Broadcast.broadcast_type == model.Broadcast.TYPE.Version,
model.Broadcast.timestamp < max_timestamp,
).execute()
if count:
logger.info('Sweeped broadcasts. count=%d', count)
14 changes: 9 additions & 5 deletions golem/network/p2p/p2pservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
"""
network = tcpnetwork.TCPNetwork(
ProtocolFactory(
tcpnetwork.SafeProtocol,
tcpnetwork.BroadcastProtocol,
self,
SessionFactory(PeerSession)
),
Expand Down Expand Up @@ -882,13 +882,17 @@ def _send_get_tasks(self):
for p in list(self.peers.values()):
p.send_get_tasks()

def __connection_established(self, session, conn_id: str):
peer_conn = session.conn.transport.getPeer()
def __connection_established(
self,
protocol: tcpnetwork.BroadcastProtocol,
conn_id: str,
):
peer_conn = protocol.transport.getPeer()
ip_address = peer_conn.host
port = peer_conn.port

session.conn_id = conn_id
self._mark_connected(conn_id, session.address, session.port)
protocol.conn_id = conn_id
self._mark_connected(conn_id, ip_address, port)

logger.debug("Connection to peer established. %s: %s, conn_id %s",
ip_address, port, conn_id)
Expand Down
5 changes: 0 additions & 5 deletions golem/network/p2p/peersession.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ def _react_to_hello(self, msg):
logger.error("Received unexpected Hello message, ignoring")
return

# Check if sender is a seed/bootstrap node
port = getattr(msg, 'port', None)
if (self.address, port) in self.p2p_service.seeds:
compare_version(getattr(msg, 'client_ver', None))

if not self.conn.opened:
return

Expand Down
Loading