Skip to content

Commit

Permalink
Release v1.79.14
Browse files Browse the repository at this point in the history
  • Loading branch information
shanevc committed Jan 12, 2021
1 parent 89925be commit 093e7f9
Show file tree
Hide file tree
Showing 35 changed files with 798 additions and 226 deletions.
7 changes: 7 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ ENV PATH="/opt/venv/bin:$PATH"
COPY bxgateway/requirements.txt ./bxgateway_requirements.txt
COPY bxcommon/requirements.txt ./bxcommon_requirements.txt

# most recent version of pip doesn't seem to detect manylinux wheel correctly
# orjson cannot be installed normally due to alpine linux using musl-dev
RUN echo 'manylinux2014_compatible = True' > /usr/local/lib/python3.8/_manylinux.py
RUN pip install -U pip==20.2.2
RUN pip install orjson==3.4.6

RUN pip install -U pip wheel \
&& pip install -r ./bxgateway_requirements.txt \
-r ./bxcommon_requirements.txt
Expand All @@ -37,6 +43,7 @@ RUN apk update \
bash \
gcc \
openssl-dev \
gcompat \
&& pip install --upgrade pip

COPY --from=builder /opt/venv /opt/venv
Expand Down
2 changes: 1 addition & 1 deletion src/bxgateway/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"source_version": "v1.78.52.0"}
{"source_version": "v1.79.14.0"}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def process_msg_block(self, msg: AbstractBlockMessage, block_number: Optional[in
)
gateway_bdn_performance_stats_service.log_block_message_from_blockchain_node(self.connection.endpoint, True)
if block_hash in self.node.blocks_seen.contents:
self.node.on_block_seen_by_blockchain_node(block_hash, self.connection, block_number=block_number)
self.node.on_block_seen_by_blockchain_node(block_hash, self.connection, msg, block_number=block_number)
block_stats.add_block_event_by_block_hash(
block_hash,
BlockStatEventType.BLOCK_RECEIVED_FROM_BLOCKCHAIN_NODE_IGNORE_SEEN,
Expand All @@ -241,6 +241,7 @@ def process_msg_block(self, msg: AbstractBlockMessage, block_number: Optional[in
"Discarding duplicate block {} from local blockchain node.",
block_hash
)
self.node.log_blocks_network_content(self.node.network_num, msg)
return

if not self.is_valid_block_timestamp(msg):
Expand Down
5 changes: 5 additions & 0 deletions src/bxgateway/connections/abstract_gateway_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class AbstractGatewayNode(AbstractNode, metaclass=ABCMeta):
remote_blockchain_ip: Optional[str] = None
remote_blockchain_port: Optional[int] = None
remote_node_conn: Optional[AbstractGatewayBlockchainConnection] = None
remote_blockchain_protocol_version: Optional[int] = None
remote_blockchain_connection_established: bool = False
transaction_streamer_peer: Optional[OutboundPeerModel] = None

_blockchain_liveliness_alarm: Optional[AlarmId] = None
Expand Down Expand Up @@ -1171,6 +1173,9 @@ def init_memory_stats_logging(self):
)
)

def get_ws_server_status(self) -> bool:
return self._ws_server.status()

def _set_transaction_streamer_peer(self) -> None:
if self.transaction_streamer_peer is not None or self.NODE_TYPE is not NodeType.EXTERNAL_GATEWAY:
return
Expand Down
19 changes: 12 additions & 7 deletions src/bxgateway/connections/abstract_relay_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from bxcommon.messages.validation.message_size_validation_settings import MessageSizeValidationSettings
from bxcommon.models.entity_type_model import EntityType
from bxcommon.models.notification_code import NotificationCode
from bxcommon.models.transaction_flag import TransactionFlag
from bxcommon.network.abstract_socket_connection_protocol import AbstractSocketConnectionProtocol
from bxcommon.services import sdn_http_service
from bxcommon.utils import convert, performance_utils
Expand Down Expand Up @@ -180,8 +181,9 @@ def msg_tx(self, msg):
)

if processing_result.assigned_short_id:
was_missing = self.node.block_recovery_service.check_missing_sid(short_id,
RecoveredTxsSource.TXS_RECEIVED_FROM_BDN)
was_missing = self.node.block_recovery_service.check_missing_sid(
short_id, RecoveredTxsSource.TXS_RECEIVED_FROM_BDN
)
attempt_recovery |= was_missing
tx_stats.add_tx_by_hash_event(
tx_hash,
Expand All @@ -200,11 +202,12 @@ def msg_tx(self, msg):
gateway_bdn_performance_stats_service.log_tx_from_bdn(
not self.node.is_gas_price_above_min_network_fee(tx_contents)
)
attempt_recovery |= self.node.block_recovery_service.check_missing_tx_hash(tx_hash,
RecoveredTxsSource.TXS_RECEIVED_FROM_BDN)
attempt_recovery |= self.node.block_recovery_service.check_missing_tx_hash(
tx_hash, RecoveredTxsSource.TXS_RECEIVED_FROM_BDN
)

self.publish_new_transaction(
tx_hash, tx_contents
tx_hash, tx_contents, TransactionFlag.LOCAL_REGION in msg.transaction_flag()
)

if self.node.has_active_blockchain_peer():
Expand Down Expand Up @@ -405,10 +408,12 @@ def msg_refresh_blockchain_network(self, _msg: RefreshBlockchainNetworkMessage)
done_callback=self._process_blockchain_network_from_sdn
)

def publish_new_transaction(self, tx_hash: Sha256Hash, tx_contents: memoryview) -> None:
def publish_new_transaction(
self, tx_hash: Sha256Hash, tx_contents: memoryview, local_region: bool
) -> None:
self.node.feed_manager.publish_to_feed(
FeedKey(NewTransactionFeed.NAME),
RawTransactionFeedEntry(tx_hash, tx_contents)
RawTransactionFeedEntry(tx_hash, tx_contents, local_region=local_region)
)

def on_connection_established(self):
Expand Down
73 changes: 54 additions & 19 deletions src/bxgateway/connections/eth/eth_base_connection_protocol.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import time
from abc import ABCMeta
from dataclasses import dataclass
from typing import TYPE_CHECKING, cast, Optional
from typing import TYPE_CHECKING, cast, Optional, Union

from bxcommon.connections.connection_type import ConnectionType
from bxcommon.models.blockchain_peer_info import BlockchainPeerInfo
from bxcommon.utils import convert
from bxgateway import gateway_constants
from bxcommon.utils.blockchain_utils.eth import eth_common_constants
Expand All @@ -12,10 +14,10 @@
from bxgateway.messages.eth.protocol.eth_protocol_message_factory import EthProtocolMessageFactory
from bxgateway.messages.eth.protocol.eth_protocol_message_type import EthProtocolMessageType
from bxgateway.messages.eth.protocol.hello_eth_protocol_message import HelloEthProtocolMessage
from bxgateway.messages.eth.protocol.ping_eth_protocol_message import PingEthProtocolMessage
from bxgateway.messages.eth.protocol.pong_eth_protocol_message import PongEthProtocolMessage
from bxgateway.messages.eth.protocol.raw_eth_protocol_message import RawEthProtocolMessage
from bxgateway.messages.eth.protocol.status_eth_protocol_message import StatusEthProtocolMessage
from bxgateway.messages.eth.protocol.status_eth_protocol_message_v63 import StatusEthProtocolMessageV63
from bxgateway.utils.eth import frame_utils
from bxgateway.utils.eth.rlpx_cipher import RLPxCipher
from bxgateway.utils.stats.eth.eth_gateway_stats_service import eth_gateway_stats_service
Expand Down Expand Up @@ -132,14 +134,20 @@ def msg_hello(self, msg: HelloEthProtocolMessage):
client_version_string, version)
self.connection_status.hello_message_received = True

def msg_status(self, msg: StatusEthProtocolMessage):
def msg_status(self, msg: Union[StatusEthProtocolMessage, StatusEthProtocolMessageV63]):
self.connection.log_trace("Status message received.")
self.connection_status.status_message_received = True

for peer in self.node.blockchain_peers:
if self.node.is_blockchain_peer(self.connection.peer_ip, self.connection.peer_port):
peer.connection_established = True

chain_difficulty_from_status_msg = msg.get_chain_difficulty()
chain_difficulty = int(self.node.opts.chain_difficulty, 16)
fork_id = msg.get_fork_id()
if isinstance(chain_difficulty_from_status_msg, int) and chain_difficulty_from_status_msg > chain_difficulty:
chain_difficulty = chain_difficulty_from_status_msg
self._enqueue_status_message(chain_difficulty)
self._enqueue_status_message(chain_difficulty, fork_id)

def msg_disconnect(self, msg):
self.connection_status.disconnect_message_received = True
Expand Down Expand Up @@ -186,29 +194,56 @@ def _enqueue_auth_ack_message(self):
self.connection.enqueue_msg_bytes(auth_ack_msg_bytes)
self.connection_status.auth_ack_message_sent = True

def _get_eth_protocol_version(self) -> int:
for peer in self.node.blockchain_peers:
if self.node.is_blockchain_peer(self.connection.peer_ip, self.connection.peer_port):
return peer.blockchain_protocol_version
return eth_common_constants.ETH_PROTOCOL_VERSION

def _enqueue_hello_message(self):
public_key = self.node.get_public_key()

hello_msg = HelloEthProtocolMessage(None,
eth_common_constants.P2P_PROTOCOL_VERSION,
f"{gateway_constants.GATEWAY_PEER_NAME} {self.node.opts.source_version}",
eth_common_constants.CAPABILITIES,
self.connection.external_port,
public_key)
eth_protocol_version = eth_common_constants.ETH_PROTOCOL_VERSION
if self.connection.CONNECTION_TYPE == ConnectionType.BLOCKCHAIN_NODE:
eth_protocol_version = self._get_eth_protocol_version()
elif self.connection.CONNECTION_TYPE == ConnectionType.REMOTE_BLOCKCHAIN_NODE:
eth_protocol_version = self.node.remote_blockchain_protocol_version

hello_msg = HelloEthProtocolMessage(
None,
eth_common_constants.P2P_PROTOCOL_VERSION,
f"{gateway_constants.GATEWAY_PEER_NAME} {self.node.opts.source_version}".encode("utf-8"),
((b"eth", eth_protocol_version),),
self.connection.external_port,
public_key
)
self.connection.enqueue_msg(hello_msg)
self.connection_status.hello_message_sent = True

def _enqueue_status_message(self, chain_difficulty: int):
def _enqueue_status_message(self, chain_difficulty: int, fork_id):
network_id = self.node.opts.network_id
chain_head_hash = convert.hex_to_bytes(self.node.opts.genesis_hash)
genesis_hash = convert.hex_to_bytes(self.node.opts.genesis_hash)

status_msg = StatusEthProtocolMessage(None,
eth_common_constants.ETH_PROTOCOL_VERSION,
network_id,
chain_difficulty,
chain_head_hash,
genesis_hash)
eth_protocol_version = self._get_eth_protocol_version()

if eth_common_constants.ETH_PROTOCOL_VERSION == gateway_constants.ETH_PROTOCOL_VERSION_63:
status_msg = StatusEthProtocolMessageV63(
None,
eth_protocol_version,
network_id,
chain_difficulty,
chain_head_hash,
genesis_hash,
)
else:
status_msg = StatusEthProtocolMessage(
None,
eth_protocol_version,
network_id,
chain_difficulty,
chain_head_hash,
genesis_hash,
fork_id
)

self.connection.enqueue_msg(status_msg)
self.connection_status.status_message_sent = True
Expand Down
84 changes: 83 additions & 1 deletion src/bxgateway/connections/eth/eth_gateway_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bxcommon.messages.abstract_block_message import AbstractBlockMessage
from bxcommon.messages.abstract_message import AbstractMessage
from bxcommon.messages.eth.serializers.transaction import Transaction
from bxcommon.models.blockchain_peer_info import BlockchainPeerInfo
from bxcommon.network.abstract_socket_connection_protocol import AbstractSocketConnectionProtocol
from bxcommon.network.ip_endpoint import IpEndpoint
from bxcommon.network.peer_info import ConnectionPeerInfo
Expand All @@ -28,6 +29,7 @@
from bxgateway.connections.abstract_gateway_blockchain_connection import AbstractGatewayBlockchainConnection
from bxgateway.connections.abstract_gateway_node import AbstractGatewayNode
from bxgateway.connections.abstract_relay_connection import AbstractRelayConnection
from bxgateway.connections.eth.eth_base_connection_protocol import EthConnectionProtocolStatus
from bxgateway.connections.eth.eth_node_connection import EthNodeConnection
from bxgateway.connections.eth.eth_node_discovery_connection import EthNodeDiscoveryConnection
from bxgateway.connections.eth.eth_relay_connection import EthRelayConnection
Expand All @@ -38,6 +40,7 @@
from bxgateway.feed.eth.eth_on_block_feed import EthOnBlockFeed, EventNotification
from bxcommon.feed.eth.eth_pending_transaction_feed import EthPendingTransactionFeed
from bxgateway.feed.eth.eth_raw_block import EthRawBlock
from bxgateway.feed.eth.eth_transaction_receipts_feed import EthTransactionReceiptsFeed
from bxgateway.messages.eth import eth_message_converter_factory as converter_factory
from bxgateway.messages.eth.internal_eth_block_info import InternalEthBlockInfo
from bxgateway.messages.eth.new_block_parts import NewBlockParts
Expand Down Expand Up @@ -87,6 +90,9 @@ def __init__(self, opts, node_ssl_service: NodeSSLService) -> None:
super(EthGatewayNode, self).__init__(opts, node_ssl_service,
eth_common_constants.TRACKED_BLOCK_CLEANUP_INTERVAL_S)

self.remote_blockchain_protocol_version = eth_common_constants.ETH_PROTOCOL_VERSION
self.remote_blockchain_connection_established = False

self._node_public_key = None
self._remote_public_key = None

Expand Down Expand Up @@ -324,12 +330,77 @@ def log_received_remote_blocks(self, blocks_count: int) -> None:
else:
logger.warning(log_messages.UNEXPECTED_BLOCKS)

def _should_decrease_version_number(
self, connection_state: ConnectionState, connection_status: EthConnectionProtocolStatus
) -> bool:
return (
ConnectionState.ESTABLISHED not in connection_state
and connection_status.auth_message_sent
and not connection_status.auth_message_received
and not connection_status.auth_ack_message_sent
and connection_status.auth_ack_message_received
and connection_status.hello_message_sent
and connection_status.hello_message_received
and not connection_status.status_message_sent
and not connection_status.status_message_received
and connection_status.disconnect_message_received
and connection_status.disconnect_reason == 3
)

def _get_new_protocol_version(self, connection, peer_version) -> int:
for index, version in enumerate(eth_common_constants.SUPPORTED_ETH_PROTOCOL_VERSION):
if (
version == peer_version
and index < len(eth_common_constants.SUPPORTED_ETH_PROTOCOL_VERSION) - 1
):
new_version = eth_common_constants.SUPPORTED_ETH_PROTOCOL_VERSION[index + 1]
connection.log_debug(
"Failed to connect with version {}, try to reconnect with version {}",
version, new_version
)
return new_version

return peer_version

def _should_log_closed_connection(self, connection: AbstractConnection) -> bool:
if isinstance(connection, EthNodeConnection):
connection_status = connection.connection_protocol.connection_status

if (
self._should_decrease_version_number(connection.state, connection_status)
and connection.CONNECTION_TYPE == ConnectionType.BLOCKCHAIN_NODE
):
for peer in self.blockchain_peers:
if (
peer == BlockchainPeerInfo(connection.peer_ip, connection.peer_port)
and not peer.connection_established
):
peer.blockchain_protocol_version = self._get_new_protocol_version(
connection, peer.blockchain_protocol_version
)
return False

elif isinstance(connection, EthRemoteConnection):
connection_status = connection.connection_protocol.connection_status

if (
self._should_decrease_version_number(connection.state, connection_status)
and connection.CONNECTION_TYPE == ConnectionType.REMOTE_BLOCKCHAIN_NODE
and not self.remote_blockchain_connection_established
):
self.remote_blockchain_protocol_version = \
self._get_new_protocol_version(
connection, self.remote_blockchain_protocol_version
)
return False

return True

def log_closed_connection(self, connection: AbstractConnection) -> None:
if isinstance(connection, EthNodeConnection):
# pyre-fixme[22]: The cast is redundant.
eth_node_connection = cast(EthNodeConnection, connection)
connection_status = connection.connection_protocol.connection_status

if ConnectionState.INITIALIZED not in eth_node_connection.state:
logger.info("Failed to connect to Ethereum node. Verify that provided ip address ({}) and port ({}) "
"are correct. Verify that firewall port is open. Connection details: {}.",
Expand All @@ -354,6 +425,7 @@ def log_closed_connection(self, connection: AbstractConnection) -> None:
eth_node_connection)
else:
super(EthGatewayNode, self).log_closed_connection(connection)

elif isinstance(connection, GatewayConnection):
if ConnectionState.ESTABLISHED not in connection.state:
logger.debug("Failed to connect to: {}.", connection)
Expand All @@ -376,6 +448,7 @@ def init_live_feeds(self) -> None:
self.feed_manager.register_feed(EthPendingTransactionFeed(self.alarm_queue))
self.feed_manager.register_feed(EthOnBlockFeed(self))
self.feed_manager.register_feed(EthNewBlockFeed(self))
self.feed_manager.register_feed(EthTransactionReceiptsFeed(self))

def on_new_subscriber_request(self) -> None:
if self.opts.eth_ws_uri and not self.eth_ws_proxy_publisher.running:
Expand Down Expand Up @@ -461,6 +534,7 @@ def publish_block(
self._get_block_message_lazy(block_message, block_hash)
)
self._publish_block_to_on_block_feed(raw_block)
self._publish_block_to_transaction_receipts_feed(raw_block)
if block_message is not None:
self._publish_block_to_new_block_feed(raw_block)

Expand Down Expand Up @@ -533,6 +607,14 @@ def _publish_block_to_on_block_feed(
FeedKey(EthOnBlockFeed.NAME), EventNotification(raw_block.block_number)
)

def _publish_block_to_transaction_receipts_feed(
self,
raw_block: EthRawBlock
):
self.feed_manager.publish_to_feed(
FeedKey(EthTransactionReceiptsFeed.NAME), raw_block
)

def is_gas_price_above_min_network_fee(self, transaction_contents: Union[memoryview, bytearray]) -> bool:
gas_price = eth_common_utils.raw_tx_gas_price(memoryview(transaction_contents), 0)
if gas_price >= self.get_blockchain_network().min_tx_network_fee:
Expand Down
Loading

0 comments on commit 093e7f9

Please sign in to comment.