Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eliminate polling in full node for processed tx #17171

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ class FullNode:
_add_transaction_semaphore: Optional[asyncio.Semaphore] = None
_db_wrapper: Optional[DBWrapper2] = None
_hint_store: Optional[HintStore] = None
transaction_responses: List[Tuple[bytes32, MempoolInclusionStatus, Optional[Err]]] = dataclasses.field(
default_factory=list
)
_block_store: Optional[BlockStore] = None
_coin_store: Optional[CoinStore] = None
_mempool_manager: Optional[MempoolManager] = None
Expand Down Expand Up @@ -285,7 +282,6 @@ async def manage(self) -> AsyncIterator[None]:
# Transactions go into this queue from the server, and get sent to respond_transaction
self._transaction_queue = TransactionQueue(1000, self.log)
self._transaction_queue_task: asyncio.Task[None] = asyncio.create_task(self._handle_transactions())
self.transaction_responses = []

self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())

Expand Down Expand Up @@ -470,9 +466,7 @@ async def _handle_one_transaction(self, entry: TransactionQueueEntry) -> None:
peer = entry.peer
try:
inc_status, err = await self.add_transaction(entry.transaction, entry.spend_name, peer, entry.test)
self.transaction_responses.append((entry.spend_name, inc_status, err))
if len(self.transaction_responses) > 50:
self.transaction_responses = self.transaction_responses[1:]
entry.done.set_result((inc_status, err))
except asyncio.CancelledError:
error_stack = traceback.format_exc()
self.log.debug(f"Cancelling _handle_one_transaction, closing: {error_stack}")
Expand Down
23 changes: 7 additions & 16 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple

import anyio
from chia_rs import AugSchemeMPL, G1Element, G2Element
from chiabip158 import PyBIP158

Expand Down Expand Up @@ -1271,22 +1272,12 @@ async def send_transaction(
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.SUCCESS), None)
return make_msg(ProtocolMessageTypes.transaction_ack, response)

await self.full_node.transaction_queue.put(
TransactionQueueEntry(request.transaction, None, spend_name, None, test), peer_id=None, high_priority=True
)
# Waits for the transaction to go into the mempool, times out after 45 seconds.
status, error = None, None
sleep_time = 0.01
for i in range(int(45 / sleep_time)):
await asyncio.sleep(sleep_time)
for potential_name, potential_status, potential_error in self.full_node.transaction_responses:
if spend_name == potential_name:
status = potential_status
error = potential_error
break
if status is not None:
break
if status is None:
queue_entry = TransactionQueueEntry(request.transaction, None, spend_name, None, test)
await self.full_node.transaction_queue.put(queue_entry, peer_id=None, high_priority=True)
try:
with anyio.fail_after(delay=45):
status, error = await queue_entry.done
except TimeoutError:
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.PENDING), None)
else:
error_name = error.name if error is not None else None
Expand Down
31 changes: 13 additions & 18 deletions chia/types/transaction_queue_entry.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional
import asyncio
from dataclasses import dataclass, field
from typing import Optional, Tuple

from chia.server.ws_connection import WSChiaConnection
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.util.errors import Err


@dataclass(frozen=True)
Expand All @@ -14,20 +17,12 @@ class TransactionQueueEntry:
A transaction received from peer. This is put into a queue, and not yet in the mempool.
"""

transaction: SpendBundle
transaction_bytes: Optional[bytes]
transaction: SpendBundle = field(compare=False)
transaction_bytes: Optional[bytes] = field(compare=False)
spend_name: bytes32
peer: Optional[WSChiaConnection]
test: bool

def __lt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name < other.spend_name

def __le__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name <= other.spend_name

def __gt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name > other.spend_name

def __ge__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name >= other.spend_name
peer: Optional[WSChiaConnection] = field(compare=False)
test: bool = field(compare=False)
done: asyncio.Future[Tuple[MempoolInclusionStatus, Optional[Err]]] = field(
default_factory=asyncio.Future,
compare=False,
)
Loading