Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: implementation of ethstats plugin #1216

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions trinity/plugins/builtin/ethstats/ethstats_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,26 @@

import websockets

from cancel_token import (
CancelToken,
)

from p2p.service import (
BaseService,
)


# Returns UTC timestamp in ms, used for latency calculation
def timestamp_ms():
def timestamp_ms() -> int:
return round(datetime.datetime.utcnow().timestamp() * 1000)


EthstatsData = typing.Dict[str, typing.Any]


class EthstatsMessage(typing.NamedTuple):
command: str
data: dict
data: EthstatsData


class EthstatsException(Exception):
Expand All @@ -29,16 +36,15 @@ def __init__(
self,
websocket: websockets.client.WebSocketClientProtocol,
node_id: str,
*args,
**kwargs,
token: CancelToken = None,
) -> None:
super().__init__(*args, **kwargs)
super().__init__(token)

self.websocket = websocket
self.node_id = node_id

self.send_queue: asyncio.Queue = asyncio.Queue()
self.recv_queue: asyncio.Queue = asyncio.Queue()
self.send_queue: asyncio.Queue[EthstatsMessage] = asyncio.Queue()
self.recv_queue: asyncio.Queue[EthstatsMessage] = asyncio.Queue()
evgeniuz marked this conversation as resolved.
Show resolved Hide resolved

async def _run(self) -> None:
await self.wait_first(
Expand Down Expand Up @@ -98,19 +104,19 @@ async def recv(self) -> EthstatsMessage:

# Following methods used to enqueue messages to be sent

async def send_hello(self, secret: str, info: dict) -> None:
async def send_hello(self, secret: str, info: EthstatsData) -> None:
await self.send_queue.put(EthstatsMessage(
'hello',
{'info': info, 'secret': secret},
))

async def send_stats(self, stats: dict) -> None:
async def send_stats(self, stats: EthstatsData) -> None:
await self.send_queue.put(EthstatsMessage(
'stats',
{'stats': stats},
))

async def send_block(self, block: dict) -> None:
async def send_block(self, block: EthstatsData) -> None:
await self.send_queue.put(EthstatsMessage(
'block',
{'block': block},
Expand All @@ -122,7 +128,7 @@ async def send_pending(self, pending: int) -> None:
{'stats': {'pending': pending}},
))

async def send_history(self, history: dict) -> None:
async def send_history(self, history: EthstatsData) -> None:
await self.send_queue.put(EthstatsMessage(
'history',
{'history': history},
Expand Down
10 changes: 4 additions & 6 deletions trinity/plugins/builtin/ethstats/ethstats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from trinity.plugins.builtin.ethstats.ethstats_client import (
EthstatsClient,
EthstatsMessage,
EthstatsData,
timestamp_ms,
)

Expand All @@ -33,11 +34,8 @@ def __init__(
server_secret: str,
node_id: str,
node_contact: str,

*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
super().__init__()

self.context = context

Expand Down Expand Up @@ -92,7 +90,7 @@ async def statistics_handler(self, client: EthstatsClient) -> None:

await self.sleep(5)

def get_node_info(self) -> dict:
def get_node_info(self) -> EthstatsData:
return {
'name': self.node_id,
'contact': self.node_contact,
Expand All @@ -102,7 +100,7 @@ def get_node_info(self) -> dict:
'canUpdateHistory': False,
}

async def get_node_stats(self) -> dict:
async def get_node_stats(self) -> EthstatsData:
response: PeerCountResponse = await self.context.event_bus.request(
PeerCountRequest()
)
Expand Down