Skip to content

Commit

Permalink
eliminate polling in full node for processed tx (#17171)
Browse files Browse the repository at this point in the history
  • Loading branch information
altendky authored Dec 27, 2023
1 parent a389704 commit cd78dba
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 41 deletions.
8 changes: 1 addition & 7 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ class FullNode:
_add_transaction_semaphore: Optional[asyncio.Semaphore] = None
_db_wrapper: Optional[DBWrapper2] = None
_hint_store: Optional[HintStore] = None
transaction_responses: List[Tuple[bytes32, MempoolInclusionStatus, Optional[Err]]] = dataclasses.field(
default_factory=list
)
_block_store: Optional[BlockStore] = None
_coin_store: Optional[CoinStore] = None
_mempool_manager: Optional[MempoolManager] = None
Expand Down Expand Up @@ -285,7 +282,6 @@ async def manage(self) -> AsyncIterator[None]:
# Transactions go into this queue from the server, and get sent to respond_transaction
self._transaction_queue = TransactionQueue(1000, self.log)
self._transaction_queue_task: asyncio.Task[None] = asyncio.create_task(self._handle_transactions())
self.transaction_responses = []

self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())

Expand Down Expand Up @@ -470,9 +466,7 @@ async def _handle_one_transaction(self, entry: TransactionQueueEntry) -> None:
peer = entry.peer
try:
inc_status, err = await self.add_transaction(entry.transaction, entry.spend_name, peer, entry.test)
self.transaction_responses.append((entry.spend_name, inc_status, err))
if len(self.transaction_responses) > 50:
self.transaction_responses = self.transaction_responses[1:]
entry.done.set_result((inc_status, err))
except asyncio.CancelledError:
error_stack = traceback.format_exc()
self.log.debug(f"Cancelling _handle_one_transaction, closing: {error_stack}")
Expand Down
23 changes: 7 additions & 16 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple

import anyio
from chia_rs import AugSchemeMPL, G1Element, G2Element
from chiabip158 import PyBIP158

Expand Down Expand Up @@ -1271,22 +1272,12 @@ async def send_transaction(
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.SUCCESS), None)
return make_msg(ProtocolMessageTypes.transaction_ack, response)

await self.full_node.transaction_queue.put(
TransactionQueueEntry(request.transaction, None, spend_name, None, test), peer_id=None, high_priority=True
)
# Waits for the transaction to go into the mempool, times out after 45 seconds.
status, error = None, None
sleep_time = 0.01
for i in range(int(45 / sleep_time)):
await asyncio.sleep(sleep_time)
for potential_name, potential_status, potential_error in self.full_node.transaction_responses:
if spend_name == potential_name:
status = potential_status
error = potential_error
break
if status is not None:
break
if status is None:
queue_entry = TransactionQueueEntry(request.transaction, None, spend_name, None, test)
await self.full_node.transaction_queue.put(queue_entry, peer_id=None, high_priority=True)
try:
with anyio.fail_after(delay=45):
status, error = await queue_entry.done
except TimeoutError:
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.PENDING), None)
else:
error_name = error.name if error is not None else None
Expand Down
31 changes: 13 additions & 18 deletions chia/types/transaction_queue_entry.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional
import asyncio
from dataclasses import dataclass, field
from typing import Optional, Tuple

from chia.server.ws_connection import WSChiaConnection
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.util.errors import Err


@dataclass(frozen=True)
Expand All @@ -14,20 +17,12 @@ class TransactionQueueEntry:
A transaction received from peer. This is put into a queue, and not yet in the mempool.
"""

transaction: SpendBundle
transaction_bytes: Optional[bytes]
transaction: SpendBundle = field(compare=False)
transaction_bytes: Optional[bytes] = field(compare=False)
spend_name: bytes32
peer: Optional[WSChiaConnection]
test: bool

def __lt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name < other.spend_name

def __le__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name <= other.spend_name

def __gt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name > other.spend_name

def __ge__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name >= other.spend_name
peer: Optional[WSChiaConnection] = field(compare=False)
test: bool = field(compare=False)
done: asyncio.Future[Tuple[MempoolInclusionStatus, Optional[Err]]] = field(
default_factory=asyncio.Future,
compare=False,
)

0 comments on commit cd78dba

Please sign in to comment.