Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync-v2): Add both BlockchainStreamingClient and TransactionStreamingClient to manage streamings from the client side #848

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 116 additions & 225 deletions hathor/p2p/sync_v2/agent.py

Large diffs are not rendered by default.

141 changes: 141 additions & 0 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Optional

from structlog import get_logger
from twisted.internet.defer import Deferred

from hathor.p2p.sync_v2.exception import (
BlockNotConnectedToPreviousBlock,
InvalidVertexError,
StreamingError,
TooManyRepeatedVerticesError,
TooManyVerticesReceivedError,
)
from hathor.p2p.sync_v2.streamers import StreamEnd
from hathor.transaction import Block
from hathor.transaction.exceptions import HathorError
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo

logger = get_logger()


class BlockchainStreamingClient:
def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None:
self.sync_agent = sync_agent
self.protocol = self.sync_agent.protocol
self.tx_storage = self.sync_agent.tx_storage
self.manager = self.sync_agent.manager

self.log = logger.new(peer=self.protocol.get_short_peer_id())

self.start_block = start_block
self.end_block = end_block

# When syncing blocks we start streaming with all peers
# so the moment I get some repeated blocks, I stop the download
# because it's probably a streaming that I've already received
self.max_repeated_blocks = 10

self._deferred: Deferred[StreamEnd] = Deferred()

self._blk_received: int = 0
self._blk_repeated: int = 0

self._blk_max_quantity = self.end_block.height - self.start_block.height + 1
self._reverse: bool = False
if self._blk_max_quantity < 0:
self._blk_max_quantity = -self._blk_max_quantity
self._reverse = True

self._last_received_block: Optional[Block] = None

self._partial_blocks: list[Block] = []

def wait(self) -> Deferred[StreamEnd]:
"""Return the deferred."""
return self._deferred
jansegre marked this conversation as resolved.
Show resolved Hide resolved

def fails(self, reason: 'StreamingError') -> None:
"""Fail the execution by resolving the deferred with an error."""
self._deferred.errback(reason)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

def handle_blocks(self, blk: Block) -> None:
"""This method is called by the sync agent when a BLOCKS message is received."""
if self._deferred.called:
return

self._blk_received += 1
if self._blk_received > self._blk_max_quantity:
self.log.warn('too many blocks received',
blk_received=self._blk_received,
blk_max_quantity=self._blk_max_quantity)
self.fails(TooManyVerticesReceivedError())
return

assert blk.hash is not None
is_duplicated = False
if self.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
if self._blk_repeated > self.max_repeated_blocks:
self.log.debug('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())
jansegre marked this conversation as resolved.
Show resolved Hide resolved

# basic linearity validation, crucial for correctly predicting the next block's height
if self._reverse:
if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash():
self.fails(BlockNotConnectedToPreviousBlock())
return
else:
if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash:
self.fails(BlockNotConnectedToPreviousBlock())
return

try:
# this methods takes care of checking if the block already exists,
# it will take care of doing at least a basic validation
if is_duplicated:
self.log.debug('block early terminate?', blk_id=blk.hash.hex())
else:
self.log.debug('block received', blk_id=blk.hash.hex())
self.sync_agent.on_new_tx(blk, propagate_to_peers=False, quiet=True)
except HathorError:
self.fails(InvalidVertexError())
return
else:
self._last_received_block = blk
self._blk_repeated = 0
# XXX: debugging log, maybe add timing info
if self._blk_received % 500 == 0:
self.log.debug('block streaming in progress', blocks_received=self._blk_received)

if not blk.can_validate_full():
self._partial_blocks.append(blk)

def handle_blocks_end(self, response_code: StreamEnd) -> None:
"""This method is called by the sync agent when a BLOCKS-END message is received."""
if self._deferred.called:
return
self._deferred.callback(response_code)
37 changes: 37 additions & 0 deletions hathor/p2p/sync_v2/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

class StreamingError(Exception):
"""Base error for sync-v2 streaming."""
pass


class TooManyVerticesReceivedError(StreamingError):
"""Raised when the other peer sent too many vertices."""
pass


class TooManyRepeatedVerticesError(StreamingError):
"""Raised when the other peer sent too many repeated vertices."""
pass


class BlockNotConnectedToPreviousBlock(StreamingError):
"""Raised when the received block is not connected to the previous one."""
pass


class InvalidVertexError(StreamingError):
"""Raised when the received vertex fails validation."""
pass
73 changes: 73 additions & 0 deletions hathor/p2p/sync_v2/payloads.py
glevco marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2023 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pydantic import validator

from hathor.types import VertexId
from hathor.utils.pydantic import BaseModel


class PayloadBaseModel(BaseModel):

@classmethod
def convert_hex_to_bytes(cls, value: str | VertexId) -> VertexId:
"""Convert a string in hex format to bytes. If bytes are given, it does nothing."""
if isinstance(value, str):
return bytes.fromhex(value)
elif isinstance(value, VertexId):
return value
jansegre marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError('invalid type')

class Config:
json_encoders = {
VertexId: lambda x: x.hex()
}


class GetNextBlocksPayload(PayloadBaseModel):
"""GET-NEXT-BLOCKS message is used to request a stream of blocks in the best blockchain."""

start_hash: VertexId
end_hash: VertexId
quantity: int

@validator('start_hash', 'end_hash', pre=True)
def validate_bytes_fields(cls, value: str | bytes) -> VertexId:
return cls.convert_hex_to_bytes(value)


class BestBlockPayload(PayloadBaseModel):
"""BEST-BLOCK message is used to send information about the current best block."""

block: VertexId
height: int

@validator('block', pre=True)
def validate_bytes_fields(cls, value: str | VertexId) -> VertexId:
return cls.convert_hex_to_bytes(value)


class GetTransactionsBFSPayload(PayloadBaseModel):
"""GET-TRANSACTIONS-BFS message is used to request a stream of transactions confirmed by blocks."""
start_from: list[VertexId]
first_block_hash: VertexId
last_block_hash: VertexId

@validator('first_block_hash', 'last_block_hash', pre=True)
def validate_bytes_fields(cls, value: str | VertexId) -> VertexId:
return cls.convert_hex_to_bytes(value)

@validator('start_from', pre=True, each_item=True)
def validate_start_from(cls, value: str | VertexId) -> VertexId:
return cls.convert_hex_to_bytes(value)
16 changes: 11 additions & 5 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __str__(self):


@implementer(IPushProducer)
class _StreamingBase:
class _StreamingServerBase:
def __init__(self, node_sync: 'NodeBlockSync', *, limit: int = DEFAULT_STREAMING_LIMIT):
self.node_sync = node_sync
self.protocol: 'HathorProtocol' = node_sync.protocol
Expand Down Expand Up @@ -123,7 +123,7 @@ def stopProducing(self) -> None:
self.pauseProducing()


class BlockchainStreaming(_StreamingBase):
class BlockchainStreamingServer(_StreamingServerBase):
def __init__(self, node_sync: 'NodeBlockSync', start_block: Block, end_hash: bytes,
*, limit: int = DEFAULT_STREAMING_LIMIT, reverse: bool = False):
super().__init__(node_sync, limit=limit)
Expand Down Expand Up @@ -186,19 +186,25 @@ def send_next(self) -> None:
self.schedule_if_needed()


class TransactionsStreaming(_StreamingBase):
class TransactionsStreamingServer(_StreamingServerBase):
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
"""

def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction], last_block_hash: bytes,
*, limit: int = DEFAULT_STREAMING_LIMIT):
def __init__(self,
node_sync: 'NodeBlockSync',
start_from: list[BaseTransaction],
first_block_hash: bytes,
last_block_hash: bytes,
*,
limit: int = DEFAULT_STREAMING_LIMIT) -> None:
# XXX: is limit needed for tx streaming? Or let's always send all txs for
# a block? Very unlikely we'll reach this limit
super().__init__(node_sync, limit=limit)

assert len(start_from) > 0
assert start_from[0].storage is not None
self.storage = start_from[0].storage
self.first_block_hash = first_block_hash
self.last_block_hash = last_block_hash
self.last_block_height = 0

Expand Down
Loading
Loading