Skip to content

Commit

Permalink
feat(wallet): Add a vertex history streamer to the wallet websocket API
Browse files Browse the repository at this point in the history
  • Loading branch information
msbrogli committed Jul 16, 2024
1 parent 5a788d0 commit 82c9605
Show file tree
Hide file tree
Showing 4 changed files with 401 additions and 37 deletions.
3 changes: 2 additions & 1 deletion hathor/builder/resources_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ def create_resources(self) -> server.Site:

# Websocket resource
assert self.manager.tx_storage.indexes is not None
ws_factory = HathorAdminWebsocketFactory(metrics=self.manager.metrics,
ws_factory = HathorAdminWebsocketFactory(manager=self.manager,
metrics=self.manager.metrics,
address_index=self.manager.tx_storage.indexes.addresses)
ws_factory.start()
root.putChild(b'ws', WebSocketResource(ws_factory))
Expand Down
62 changes: 28 additions & 34 deletions hathor/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from collections import defaultdict, deque
from typing import Any, Optional, Union
from typing import Any, Optional

from autobahn.exception import Disconnected
from autobahn.twisted.websocket import WebSocketServerFactory
Expand All @@ -22,11 +22,12 @@

from hathor.conf import HathorSettings
from hathor.indexes import AddressIndex
from hathor.manager import HathorManager
from hathor.metrics import Metrics
from hathor.p2p.rate_limiter import RateLimiter
from hathor.pubsub import EventArguments, HathorEvents
from hathor.reactor import get_global_reactor
from hathor.util import json_dumpb, json_loadb, json_loads
from hathor.util import json_dumpb
from hathor.websocket.protocol import HathorAdminWebsocketProtocol

settings = HathorSettings()
Expand Down Expand Up @@ -85,11 +86,15 @@ class HathorAdminWebsocketFactory(WebSocketServerFactory):
def buildProtocol(self, addr):
return self.protocol(self)

def __init__(self, metrics: Optional[Metrics] = None, address_index: Optional[AddressIndex] = None):
def __init__(self,
manager: HathorManager,
metrics: Optional[Metrics] = None,
address_index: Optional[AddressIndex] = None):
"""
:param metrics: If not given, a new one is created.
:type metrics: :py:class:`hathor.metrics.Metrics`
"""
self.manager = manager
self.reactor = get_global_reactor()
# Opened websocket connections so I can broadcast messages later
# It contains only connections that have finished handshaking.
Expand Down Expand Up @@ -300,44 +305,33 @@ def process_deque(self, data_type):
data_type=data_type)
break

def handle_message(self, connection: HathorAdminWebsocketProtocol, data: Union[bytes, str]) -> None:
""" General message handler, detects type and deletages to specific handler."""
if isinstance(data, bytes):
message = json_loadb(data)
else:
message = json_loads(data)
# we only handle ping messages for now
if message['type'] == 'ping':
self._handle_ping(connection, message)
elif message['type'] == 'subscribe_address':
self._handle_subscribe_address(connection, message)
elif message['type'] == 'unsubscribe_address':
self._handle_unsubscribe_address(connection, message)

def _handle_ping(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for ping message, should respond with a simple {"type": "pong"}"""
payload = json_dumpb({'type': 'pong'})
connection.sendMessage(payload, False)

def _handle_subscribe_address(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for subscription to an address, consideirs subscription limits."""
addr: str = message['address']
address: str = message['address']
success, errmsg = self.subscribe_address(connection, address)
response = {
'type': 'subscribe_address',
'address': address,
'success': success,
}
if not success:
response['message'] = errmsg
connection.sendMessage(json_dumpb(response), False)

def subscribe_address(self, connection: HathorAdminWebsocketProtocol, address: str) -> tuple[bool, str]:
"""Subscribe an address to send real time updates to a websocket connection."""
subs: set[str] = connection.subscribed_to
if self.max_subs_addrs_conn is not None and len(subs) >= self.max_subs_addrs_conn:
payload = json_dumpb({'message': 'Reached maximum number of subscribed '
f'addresses ({self.max_subs_addrs_conn}).',
'type': 'subscribe_address', 'success': False})
return False, f'Reached maximum number of subscribed addresses ({self.max_subs_addrs_conn}).'

elif self.max_subs_addrs_empty is not None and (
self.address_index and _count_empty(subs, self.address_index) >= self.max_subs_addrs_empty
):
payload = json_dumpb({'message': 'Reached maximum number of subscribed '
f'addresses without output ({self.max_subs_addrs_empty}).',
'type': 'subscribe_address', 'success': False})
else:
self.address_connections[addr].add(connection)
connection.subscribed_to.add(addr)
payload = json_dumpb({'type': 'subscribe_address', 'success': True})
connection.sendMessage(payload, False)
return False, f'Reached maximum number of subscribed empty addresses ({self.max_subs_addrs_empty}).'

self.address_connections[address].add(connection)
connection.subscribed_to.add(address)
return True, ''

def _handle_unsubscribe_address(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for unsubscribing from an address, also removes address connection set if it ends up empty."""
Expand Down
119 changes: 117 additions & 2 deletions hathor/websocket/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Any, Union

from autobahn.twisted.websocket import WebSocketServerProtocol
from structlog import get_logger

from hathor.util import json_dumpb, json_loadb, json_loads
from hathor.websocket.streamer import GAPLimitSearch, HistoryStreamer, ManualAddressSequencer, XPubAddressSequencer

if TYPE_CHECKING:
from hathor.websocket.factory import HathorAdminWebsocketFactory

Expand All @@ -30,23 +33,135 @@ class HathorAdminWebsocketProtocol(WebSocketServerProtocol):
can send the data update to the clients
"""

MAX_GAP_LIMIT: int = 30

def __init__(self, factory: 'HathorAdminWebsocketFactory') -> None:
self.log = logger.new()
self.factory = factory
self.subscribed_to: set[str] = set()
self._history_streamer: HistoryStreamer | None = None
self._manual_address_iter: ManualAddressSequencer | None = None
super().__init__()

def onConnect(self, request):
"""Called by the websocket protocol when the connection is opened but it is still pending handshaking."""
self.log.info('connection opened, starting handshake...', request=request)

def onOpen(self) -> None:
"""Called by the websocket protocol when the connection is established."""
self.factory.on_client_open(self)
self.log.info('connection established')

def onClose(self, wasClean, code, reason):
"""Called by the websocket protocol when the connection is closed."""
self.factory.on_client_close(self)
self.log.info('connection closed', reason=reason)

def onMessage(self, payload: Union[bytes, str], isBinary: bool) -> None:
"""Called by the websocket protocol when a new message is received."""
self.log.debug('new message', payload=payload.hex() if isinstance(payload, bytes) else payload)
self.factory.handle_message(self, payload)
if isinstance(payload, bytes):
message = json_loadb(payload)
else:
message = json_loads(payload)

_type = message.get('type')

if _type == 'ping':
self._handle_ping(message)
elif _type == 'subscribe_address':
self.factory._handle_subscribe_address(self, message)
elif _type == 'unsubscribe_address':
self.factory._handle_unsubscribe_address(self, message)
elif _type == 'request:history:xpub':
self._open_history_xpub_streamer(message)
elif _type == 'request:history:manual':
self._open_history_manual_streamer(message)

def _handle_ping(self, message: dict[Any, Any]) -> None:
"""Handle ping message, should respond with a simple {"type": "pong"}"""
payload = json_dumpb({'type': 'pong'})
self.sendMessage(payload, False)

def _open_history_xpub_streamer(self, message: dict[Any, Any]) -> None:
"""Handle request to stream transactions using an xpub."""
stream_id = message['id']

if self._history_streamer is not None:
self.sendMessage(json_dumpb({
'type': 'stream:history:error',
'id': stream_id,
'errmsg': 'Streaming is already opened.'
}))
return

xpub = message['xpub']
gap_limit = message.get('gap-limit', 20)
first_index = message.get('first-index', 0)
if gap_limit > self.MAX_GAP_LIMIT:
self.sendMessage(json_dumpb({
'type': 'stream:history:error',
'id': stream_id,
'errmsg': f'GAP limit is too big. Maximum: {self.MAX_GAP_LIMIT}'
}))
return

address_iter = XPubAddressSequencer(xpub, first_index=first_index)
search = GAPLimitSearch(self.factory.manager, address_iter, gap_limit)
self._history_streamer = HistoryStreamer(protocol=self, stream_id=stream_id, search=search)
deferred = self._history_streamer.start()
deferred.addBoth(self._streamer_callback)

def _open_history_manual_streamer(self, message: dict[Any, Any]) -> None:
"""Handle request to stream transactions using a list of addresses."""
stream_id = message['id']
addresses = message.get('addresses', [])
first = message.get('first', False)
last = message.get('last', False)

if self._history_streamer is not None:
if first or self._history_streamer.stream_id != stream_id:
self.sendMessage(json_dumpb({
'type': 'stream:history:error',
'id': stream_id,
'errmsg': 'Streaming is already opened.'
}))
return

assert self._manual_address_iter is not None
self._manual_address_iter.add_addresses(addresses, last)
return

gap_limit = message.get('gap-limit', 20)
if gap_limit > self.MAX_GAP_LIMIT:
self.sendMessage(json_dumpb({
'type': 'stream:history:error',
'id': stream_id,
'errmsg': f'GAP limit is too big. Maximum: {self.MAX_GAP_LIMIT}'
}))
return

if not first:
self.sendMessage(json_dumpb({
'type': 'stream:history:error',
'id': stream_id,
'errmsg': 'Streaming not found. You must send first=true in your first message.'
}))
return

address_iter = ManualAddressSequencer()
address_iter.add_addresses(enumerate(addresses), last)
search = GAPLimitSearch(self.factory.manager, address_iter, gap_limit)
self._manual_address_iter = address_iter
self._history_streamer = HistoryStreamer(protocol=self, stream_id=stream_id, search=search)
deferred = self._history_streamer.start()
deferred.addBoth(self._streamer_callback)

def _streamer_callback(self, success: bool) -> None:
"""Callback used to identify when the streamer has ended."""
self._history_streamer = None
self._manual_address_iter = None

def subscribe_address(self, address: str) -> tuple[bool, str]:
"""Subscribe to receive real-time messages for all vertices related to an address."""
return self.factory.subscribe_address(self, address)
Loading

0 comments on commit 82c9605

Please sign in to comment.