Skip to content

Commit

Permalink
feat(sync-v2): Add both BlockchainStreamingClient and TransactionStre…
Browse files Browse the repository at this point in the history
…amingClient to manage streamings from the client side
  • Loading branch information
msbrogli committed Nov 3, 2023
1 parent 4b04c9c commit 6d9b085
Show file tree
Hide file tree
Showing 7 changed files with 462 additions and 213 deletions.
303 changes: 93 additions & 210 deletions hathor/p2p/sync_v2/agent.py

Large diffs are not rendered by default.

140 changes: 140 additions & 0 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Optional

from structlog import get_logger
from twisted.internet.defer import Deferred

from hathor.p2p.sync_v2.exception import (
BlockNotConnectedToPreviousBlock,
InvalidVertexError,
StreamingError,
TooManyRepeatedVerticesError,
TooManyVerticesReceivedError,
)
from hathor.p2p.sync_v2.streamers import StreamEnd
from hathor.transaction import Block
from hathor.transaction.exceptions import HathorError
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo

logger = get_logger()


class BlockchainStreamingClient:
def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None:
self.sync_agent = sync_agent
self.protocol = self.sync_agent.protocol
self.tx_storage = self.sync_agent.tx_storage
self.manager = self.sync_agent.manager

self.log = logger.new(peer=self.protocol.get_short_peer_id())

self.start_block = start_block
self.end_block = end_block

# When syncing blocks we start streaming with all peers
# so the moment I get some repeated blocks, I stop the download
# because it's probably a streaming that I've already received
self.max_repeated_blocks = 10

self._deferred: Deferred[StreamEnd] = Deferred()

self._blk_received: int = 0
self._blk_repeated: int = 0

self._blk_max_quantity = self.end_block.height - self.start_block.height + 1
self._reverse: bool = False
if self._blk_max_quantity < 0:
self._blk_max_quantity = -self._blk_max_quantity
self._reverse = True

self._last_received_block: Optional[Block] = None

self._partial_blocks: list[Block] = []

def run(self) -> Deferred[StreamEnd]:
return self._deferred

def fails(self, reason: 'StreamingError') -> None:
self.sync_agent.send_stop_block_streaming()
self._deferred.errback(reason)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
""" Return true if the vertex exists no matter its validation state.
"""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

def handle_blocks(self, blk: Block) -> None:
if self._deferred.called:
return

self._blk_received += 1
if self._blk_received > self._blk_max_quantity:
self.log.warn('too many blocks received',
blk_received=self._blk_received,
blk_max_quantity=self._blk_max_quantity)
self.fails(TooManyVerticesReceivedError())
return

assert blk.hash is not None
is_duplicated = False
if self.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
if self._blk_repeated > self.max_repeated_blocks:
self.log.debug('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())

# basic linearity validation, crucial for correctly predicting the next block's height
if self._reverse:
if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash():
self.fails(BlockNotConnectedToPreviousBlock())
return
else:
if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash:
self.fails(BlockNotConnectedToPreviousBlock())
return

try:
# this methods takes care of checking if the block already exists,
# it will take care of doing at least a basic validation
if is_duplicated:
self.log.debug('block early terminate?', blk_id=blk.hash.hex())
else:
self.log.debug('block received', blk_id=blk.hash.hex())
self.sync_agent.on_new_tx(blk, propagate_to_peers=False, quiet=True)
except HathorError:
self.fails(InvalidVertexError())
return
else:
self._last_received_block = blk
self._blk_repeated = 0
# XXX: debugging log, maybe add timing info
if self._blk_received % 500 == 0:
self.log.debug('block streaming in progress', blocks_received=self._blk_received)

meta = blk.get_metadata()
if not meta.validation.is_fully_connected():
self._partial_blocks.append(blk)

def handle_blocks_end(self, response_code: StreamEnd) -> None:
if self._deferred.called:
return
self._deferred.callback(response_code)
32 changes: 32 additions & 0 deletions hathor/p2p/sync_v2/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

class StreamingError(Exception):
pass


class TooManyVerticesReceivedError(StreamingError):
pass


class TooManyRepeatedVerticesError(StreamingError):
pass


class BlockNotConnectedToPreviousBlock(StreamingError):
pass


class InvalidVertexError(StreamingError):
pass
66 changes: 66 additions & 0 deletions hathor/p2p/sync_v2/payloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pydantic import validator

from hathor.utils.pydantic import BaseModel


class PayloadBaseModel(BaseModel):

@classmethod
def convert_hex_to_bytes(cls, value: str | bytes) -> bytes:
if isinstance(value, str):
return bytes.fromhex(value)
elif isinstance(value, bytes):
return value
raise ValueError('invalid type')

class Config:
json_encoders = {
bytes: lambda x: x.hex()
}


class GetNextBlocksPayload(PayloadBaseModel):
start_hash: bytes
end_hash: bytes
quantity: int

@validator('start_hash', 'end_hash', pre=True)
def validate_bytes_fields(cls, value: str | bytes) -> bytes:
return cls.convert_hex_to_bytes(value)


class GetBestBlockPayload(PayloadBaseModel):
block: bytes
height: int

@validator('block', pre=True)
def validate_bytes_fields(cls, value: str | bytes) -> bytes:
return cls.convert_hex_to_bytes(value)


class GetTransactionsBFSPayload(PayloadBaseModel):
start_from: list[bytes]
first_block_hash: bytes
last_block_hash: bytes

@validator('first_block_hash', 'last_block_hash', pre=True)
def validate_bytes_fields(cls, value: str | bytes) -> bytes:
return cls.convert_hex_to_bytes(value)

@validator('start_from', pre=True)
def validate_start_from(cls, values: list[str | bytes]) -> list[bytes]:
return [cls.convert_hex_to_bytes(x) for x in values]
10 changes: 8 additions & 2 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,21 @@ class TransactionsStreaming(_StreamingBase):
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
"""

def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction], last_block_hash: bytes,
*, limit: int = DEFAULT_STREAMING_LIMIT):
def __init__(self,
node_sync: 'NodeBlockSync',
start_from: list[BaseTransaction],
first_block_hash: bytes,
last_block_hash: bytes,
*,
limit: int = DEFAULT_STREAMING_LIMIT) -> None:
# XXX: is limit needed for tx streaming? Or let's always send all txs for
# a block? Very unlikely we'll reach this limit
super().__init__(node_sync, limit=limit)

assert len(start_from) > 0
assert start_from[0].storage is not None
self.storage = start_from[0].storage
self.first_block_hash = first_block_hash
self.last_block_hash = last_block_hash
self.last_block_height = 0

Expand Down
118 changes: 118 additions & 0 deletions hathor/p2p/sync_v2/transaction_streaming_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING

from structlog import get_logger
from twisted.internet.defer import Deferred

from hathor.p2p.sync_v2.exception import (
InvalidVertexError,
StreamingError,
TooManyRepeatedVerticesError,
TooManyVerticesReceivedError,
)
from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, StreamEnd
from hathor.transaction import BaseTransaction
from hathor.transaction.exceptions import HathorError
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.sync_v2.agent import NodeBlockSync

logger = get_logger()


class TransactionStreamingClient:
def __init__(self,
sync_agent: 'NodeBlockSync',
start_from: list[bytes],
start_block: bytes,
end_block: bytes) -> None:
self.sync_agent = sync_agent
self.protocol = self.sync_agent.protocol
self.tx_storage = self.sync_agent.tx_storage
self.manager = self.sync_agent.manager

self.log = logger.new(peer=self.protocol.get_short_peer_id())

self.start_from = start_from
self.start_block = start_block
self.end_block = end_block

# Let's keep it at "infinity" until a known issue is fixed.
self.max_repeated_transactions = 1_000_000

self._deferred: Deferred[StreamEnd] = Deferred()

self._tx_received: int = 0
self._tx_repeated: int = 0

self._tx_max_quantity = DEFAULT_STREAMING_LIMIT

def run(self) -> Deferred[StreamEnd]:
return self._deferred

def fails(self, reason: 'StreamingError') -> None:
self.sync_agent.send_stop_block_streaming()
self._deferred.errback(reason)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
""" Return true if the vertex exists no matter its validation state.
"""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

def handle_transaction(self, tx: BaseTransaction) -> None:
if self._deferred.called:
return

self._tx_received += 1
if self._tx_received > self._tx_max_quantity:
self.log.warn('too many transactions received',
tx_received=self._tx_received,
tx_max_quantity=self._tx_max_quantity)
self.fails(TooManyVerticesReceivedError())
return

assert tx.hash is not None
is_duplicated = False
if self.partial_vertex_exists(tx.hash):
# We reached a block we already have. Skip it.
self._tx_repeated += 1
is_duplicated = True
if self._tx_repeated > self.max_repeated_transactions:
self.log.debug('too many repeated transactions received', total_repeated=self._tx_repeated)
self.fails(TooManyRepeatedVerticesError())

try:
# this methods takes care of checking if the block already exists,
# it will take care of doing at least a basic validation
if is_duplicated:
self.log.debug('tx early terminate?', tx_id=tx.hash.hex())
else:
self.log.debug('tx received', tx_id=tx.hash.hex())
self.sync_agent.on_new_tx(tx, propagate_to_peers=False, quiet=True, reject_locked_reward=True)
except HathorError:
self.fails(InvalidVertexError())
return
else:
# XXX: debugging log, maybe add timing info
if self._tx_received % 100 == 0:
self.log.debug('tx streaming in progress', txs_received=self._tx_received)

def handle_transactions_end(self, response_code: StreamEnd) -> None:
if self._deferred.called:
return
self._deferred.callback(response_code)
6 changes: 5 additions & 1 deletion tests/p2p/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,11 @@ def test_get_data(self):
self.assertAndStepConn(self.conn, b'^RELAY')
self.assertIsConnected()
missing_tx = '00000000228dfcd5dec1c9c6263f6430a5b4316bb9e3decb9441a6414bfd8697'
payload = {'until_first_block': missing_tx, 'start_from': [settings.GENESIS_BLOCK_HASH.hex()]}
payload = {
'first_block_hash': missing_tx,
'last_block_hash': missing_tx,
'start_from': [settings.GENESIS_BLOCK_HASH.hex()]
}
yield self._send_cmd(self.conn.proto1, 'GET-TRANSACTIONS-BFS', json_dumps(payload))
self._check_result_only_cmd(self.conn.peek_tr1_value(), b'NOT-FOUND')
self.conn.run_one_step()
Expand Down

0 comments on commit 6d9b085

Please sign in to comment.