Skip to content

Commit

Permalink
Problem: multi-threading tx sending not efficient (crypto-org-chain#1587
Browse files Browse the repository at this point in the history
)

* Problem: multi-threading tx sending not efficient

Solution:
- gen txs in advance
- use asyncio

* Update testground/benchmark/compositions/docker-compose.jsonnet

Signed-off-by: yihuang <huang@crypto.com>

* Update testground/benchmark/benchmark/sendtx.py

Signed-off-by: yihuang <huang@crypto.com>

* Update testground/benchmark/benchmark/sendtx.py

Signed-off-by: yihuang <huang@crypto.com>

* fix tps

* add logs

* fix

* limit connection pool size

* error message

---------

Signed-off-by: yihuang <huang@crypto.com>
  • Loading branch information
yihuang authored Sep 19, 2024
1 parent e0d64e9 commit 73c19da
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 37 deletions.
2 changes: 1 addition & 1 deletion testground/benchmark/benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def entrypoint(ctx: Context):

test_finish_entry = f"finish-test-{ctx.params.test_group_id}"
if not ctx.is_validator:
generate_load(cli, ctx.params.num_accounts, ctx.params.num_txs, ctx.global_seq)
generate_load(ctx.params.num_accounts, ctx.params.num_txs, ctx.global_seq)
print("finish test", ctx.group_seq)
ctx.sync.signal_and_wait(
test_finish_entry, ctx.params.test_group_instance_count
Expand Down
23 changes: 13 additions & 10 deletions testground/benchmark/benchmark/peer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import json
import tempfile
from pathlib import Path
Expand Down Expand Up @@ -116,18 +117,20 @@ def init_node(
def gen_genesis(
cli: ChainCommand, leader_home: Path, peers: List[PeerPacket], genesis_patch: dict
):
for peer in peers:
with tempfile.NamedTemporaryFile() as fp:
fp.write(json.dumps(peer.accounts, default=pydantic_encoder).encode())
fp.flush()
cli(
"genesis",
"bulk-add-genesis-account",
fp.name,
home=leader_home,
)
accounts = list(itertools.chain(*(peer.accounts for peer in peers)))
print("adding genesis accounts", len(accounts))
with tempfile.NamedTemporaryFile() as fp:
fp.write(json.dumps(accounts, default=pydantic_encoder).encode())
fp.flush()
cli(
"genesis",
"bulk-add-genesis-account",
fp.name,
home=leader_home,
)
collect_gen_tx(cli, peers, home=leader_home)
cli("genesis", "validate", home=leader_home)
print("genesis validated")
return patch_json(
leader_home / "config" / "genesis.json",
{
Expand Down
66 changes: 56 additions & 10 deletions testground/benchmark/benchmark/sendtx.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import aiohttp
import ujson
import web3
from eth_account import Account

from .utils import gen_account, send_transaction

GAS_PRICE = 1000000000
CHAIN_ID = 777
LOCAL_JSON_RPC = "http://localhost:8545"
CONNECTION_POOL_SIZE = 1024


def test_tx(nonce: int):
return {
"to": "0x0000000000000000000000000000000000000000",
"value": 1,
"nonce": nonce,
"gas": 21000,
"gasPrice": GAS_PRICE,
"chainId": CHAIN_ID,
}


def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):
Expand All @@ -22,15 +39,8 @@ def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):

nonce = initial_nonce
while nonce < initial_nonce + tx_amount:
tx = {
"to": "0x0000000000000000000000000000000000000000",
"value": 1,
"nonce": nonce,
"gas": 21000,
"gasPrice": GAS_PRICE,
}
try:
send_transaction(w3, tx, acct, wait=False)
send_transaction(w3, test_tx(nonce), acct, wait=False)
except ValueError as e:
msg = str(e)
if "invalid nonce" in msg:
Expand All @@ -55,9 +65,9 @@ def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):
)


def generate_load(cli, num_accounts, num_txs, global_seq, **kwargs):
def generate_load(num_accounts, num_txs, global_seq, **kwargs):
w3 = web3.Web3(web3.providers.HTTPProvider("http://localhost:8545"))
assert w3.eth.chain_id == 777
assert w3.eth.chain_id == CHAIN_ID
accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)]
with ThreadPoolExecutor(max_workers=num_accounts) as executor:
futs = (executor.submit(sendtx, w3, acct, num_txs) for acct in accounts)
Expand All @@ -66,3 +76,39 @@ def generate_load(cli, num_accounts, num_txs, global_seq, **kwargs):
fut.result()
except Exception as e:
print("test task failed", e)


def prepare_txs(global_seq, num_accounts, num_txs):
accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)]
txs = []
for i in range(num_txs):
for acct in accounts:
txs.append(acct.sign_transaction(test_tx(i)).rawTransaction.hex())
if len(txs) % 1000 == 0:
print("prepared", len(txs), "txs")

return txs


async def async_sendtx(session, raw):
async with session.post(
LOCAL_JSON_RPC,
json={
"jsonrpc": "2.0",
"method": "eth_sendRawTransaction",
"params": [raw],
"id": 1,
},
) as rsp:
data = await rsp.json()
if "error" in data:
print("send tx error", data["error"])


async def send_txs(txs):
connector = aiohttp.TCPConnector(limit=1024)
async with aiohttp.ClientSession(
connector=connector, json_serialize=ujson.dumps
) as session:
tasks = [asyncio.ensure_future(async_sendtx(session, raw)) for raw in txs]
await asyncio.gather(*tasks)
24 changes: 11 additions & 13 deletions testground/benchmark/benchmark/stateless.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import os
import shutil
Expand All @@ -22,11 +23,11 @@
init_node,
patch_configs,
)
from .sendtx import generate_load
from .sendtx import prepare_txs, send_txs
from .stats import dump_block_stats
from .topology import connect_all
from .types import PeerPacket
from .utils import block_height, block_txs, wait_for_block, wait_for_port, wait_for_w3
from .utils import block_height, block_txs, wait_for_block, wait_for_port

# use cronosd on host machine
LOCAL_CRONOSD_PATH = "cronosd"
Expand Down Expand Up @@ -206,9 +207,14 @@ def run(outdir: str, datadir: str, cronosd, global_seq):


def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
run_echo_server(ECHO_SERVER_PORT)
if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True):
print("preparing", cfg["num_accounts"] * cfg["num_txs"], "txs")
txs = prepare_txs(global_seq, cfg["num_accounts"], cfg["num_txs"])
else:
txs = []

# wait for persistent peers to be ready
run_echo_server(ECHO_SERVER_PORT)
wait_for_peers(home)

print("start node")
Expand All @@ -223,16 +229,8 @@ def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
wait_for_port(8545)
wait_for_block(cli, 3)

if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True):
wait_for_w3()
generate_load(
cli,
cfg["num_accounts"],
cfg["num_txs"],
global_seq,
home=home,
output="json",
)
if txs:
asyncio.run(send_txs(txs))

# node quit when the chain is idle or halted for a while
detect_idle_halted(20, 20)
Expand Down
4 changes: 2 additions & 2 deletions testground/benchmark/benchmark/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from .utils import block, block_height

# the tps calculation use the average of the last 10 blocks
TPS_WINDOW = 10
TPS_WINDOW = 5


def calculate_tps(blocks):
if len(blocks) < 2:
return 0

txs = sum(n for n, _ in blocks)
txs = sum(n for n, _ in blocks[1:])
_, t1 = blocks[0]
_, t2 = blocks[-1]
time_diff = (t2 - t1).total_seconds()
Expand Down
89 changes: 88 additions & 1 deletion testground/benchmark/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions testground/benchmark/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ hexbytes = "^0"
bech32 = "^1"
requests = "^2.32"
click = "^8.1.7"
ujson = "^5.10.0"

[tool.poetry.dev-dependencies]
pytest = "^8.2"
Expand Down

0 comments on commit 73c19da

Please sign in to comment.