-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathblocktools.py
271 lines (235 loc) · 8.8 KB
/
blocktools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Utilities for manipulating blocks and transactions."""
from typing import Optional, Union
from .script import (
CScript,
OP_CHECKSIG,
OP_DUP,
OP_EQUALVERIFY,
OP_HASH160,
OP_RETURN,
OP_TRUE,
OP_NOP,
)
from .messages import (
CBlock,
COIN,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
FromHex,
ToHex,
ser_string,
)
from .txtools import pad_tx
from .util import assert_equal, satoshi_round
def create_block(hashprev: Union[int, str], coinbase: Optional[CTransaction], nTime: Optional[int] = None,
*, txns=None, ctor=True):
"""Create a block (with regtest difficulty)"""
block = CBlock()
if nTime is None:
import time
block.nTime = int(time.time() + 600)
else:
assert isinstance(nTime, int)
block.nTime = nTime
if isinstance(hashprev, str):
# Convert hex-encoded headers to an int
hashprev = int(hashprev, 16)
block.hashPrevBlock = hashprev
block.nBits = 0x207fffff # Will break after a difficulty adjustment...
if coinbase:
block.vtx.append(coinbase)
if txns:
if ctor:
txns = sorted(txns, key=lambda x: x.hash)
block.vtx.extend(txns)
block.hashMerkleRoot = block.calc_merkle_root()
block.calc_sha256()
return block
def make_conform_to_ctor(block):
for tx in block.vtx:
tx.rehash()
block.vtx = [block.vtx[0]] + \
sorted(block.vtx[1:], key=lambda tx: tx.get_id())
def serialize_script_num(value):
r = bytearray(0)
if value == 0:
return r
neg = value < 0
absvalue = -value if neg else value
while (absvalue):
r.append(int(absvalue & 0xff))
absvalue >>= 8
if r[-1] & 0x80:
r.append(0x80 if neg else 0)
elif neg:
r[-1] |= 0x80
return r
# Create a coinbase transaction, assuming no miner fees.
# If pubkey is passed in, the coinbase output will be a P2PK output;
# otherwise an anyone-can-spend output.
def create_coinbase(height, pubkey=None):
coinbase = CTransaction()
coinbase.vin.append(CTxIn(COutPoint(0, 0xffffffff),
ser_string(serialize_script_num(height)), 0xffffffff))
coinbaseoutput = CTxOut()
coinbaseoutput.nValue = 50000 * COIN
halvings = int(height / 150) # regtest
coinbaseoutput.nValue >>= halvings
if (pubkey is not None):
coinbaseoutput.scriptPubKey = CScript([pubkey, OP_CHECKSIG])
else:
coinbaseoutput.scriptPubKey = CScript([OP_TRUE])
coinbase.vout = [coinbaseoutput]
# Make sure the coinbase is at least 100 bytes
pad_tx(coinbase)
coinbase.calc_sha256()
return coinbase
def bu_create_coinbase(height, pubkey=None, scriptPubKey=None):
"""BU Version:
Create a coinbase transaction, assuming no miner fees.
If pubkey is passed in, the coinbase output will be a P2PK output;
otherwise an anyone-can-spend output."""
assert not (pubkey and scriptPubKey), "cannot both have pubkey and custom scriptPubKey"
coinbase = CTransaction()
coinbase.vin.append(CTxIn(COutPoint(0, 0xffffffff),
ser_string(serialize_script_num(height)), 0xffffffff))
coinbaseoutput = CTxOut()
coinbaseoutput.nValue = 50000 * COIN
halvings = int(height / 150) # regtest
coinbaseoutput.nValue >>= halvings
if pubkey is not None:
coinbaseoutput.scriptPubKey = CScript([pubkey, OP_CHECKSIG])
else:
if scriptPubKey is None:
scriptPubKey = CScript([OP_NOP])
coinbaseoutput.scriptPubKey = CScript(scriptPubKey)
coinbase.vout = [coinbaseoutput]
# Make sure the coinbase is at least 100 bytes
coinbase_size = len(coinbase.serialize())
if coinbase_size < 100:
coinbase.vin[0].scriptSig += b'x' * (100 - coinbase_size)
coinbase.calc_sha256()
return coinbase
def create_tx_with_script(prevtx, n, script_sig=b"",
amount=1, script_pub_key=CScript()):
"""Return one-input, one-output transaction object
spending the prevtx's n-th output with the given amount.
Can optionally pass scriptPubKey and scriptSig, default is anyone-can-spend output.
"""
tx = CTransaction()
assert(n < len(prevtx.vout))
tx.vin.append(CTxIn(COutPoint(prevtx.sha256, n), script_sig, 0xffffffff))
tx.vout.append(CTxOut(amount, script_pub_key))
pad_tx(tx)
tx.calc_sha256()
return tx
def create_transaction(node, txid, to_address, amount):
""" Return signed transaction spending the first output of the
input txid. Note that the node must be able to sign for the
output that is being spent, and the node must not be running
multiple wallets.
"""
raw_tx = create_raw_transaction(node, txid, to_address, amount)
tx = FromHex(CTransaction(), raw_tx)
return tx
def create_raw_transaction(node, txid, to_address, amount, vout=0):
""" Return raw signed transaction spending an output (the first
by default) output of the input txid.
Note that the node must be able to sign for the
output that is being spent, and the node must not be running
multiple wallets.
"""
inputs = [{"txid": txid, "vout": vout}]
outputs = {to_address: amount}
rawtx = node.createrawtransaction(inputs, outputs)
signresult = node.signrawtransactionwithwallet(rawtx)
assert_equal(signresult["complete"], True)
return signresult['hex']
def get_legacy_sigopcount_block(block, fAccurate=True):
count = 0
for tx in block.vtx:
count += get_legacy_sigopcount_tx(tx, fAccurate)
return count
def get_legacy_sigopcount_tx(tx, fAccurate=True):
count = 0
for i in tx.vout:
count += i.scriptPubKey.GetSigOpCount(fAccurate)
for j in tx.vin:
# scriptSig might be of type bytes, so convert to CScript for the
# moment
count += CScript(j.scriptSig).GetSigOpCount(fAccurate)
return count
def create_confirmed_utxos(test_framework, node, count, age=101):
"""
Helper to create at least "count" utxos
"""
to_generate = int(0.5 * count) + age
while to_generate > 0:
test_framework.generate(node, min(25, to_generate))
to_generate -= 25
utxos = node.listunspent()
iterations = count - len(utxos)
addr1 = node.getnewaddress()
addr2 = node.getnewaddress()
if iterations <= 0:
return utxos
for i in range(iterations):
t = utxos.pop()
inputs = []
inputs.append({"txid": t["txid"], "vout": t["vout"]})
outputs = {}
outputs[addr1] = satoshi_round(t['amount'] / 2)
outputs[addr2] = satoshi_round(t['amount'] / 2)
raw_tx = node.createrawtransaction(inputs, outputs)
ctx = FromHex(CTransaction(), raw_tx)
fee = node.calculate_fee(ctx) // 2
ctx.vout[0].nValue -= fee
# Due to possible truncation, we go ahead and take another satoshi in
# fees to ensure the transaction gets through
ctx.vout[1].nValue -= fee + 1
signed_tx = node.signrawtransactionwithwallet(ToHex(ctx))["hex"]
node.sendrawtransaction(signed_tx)
while (node.getmempoolinfo()['size'] > 0):
test_framework.generate(node, 1)
utxos = node.listunspent()
assert len(utxos) >= count
return utxos
def mine_big_block(test_framework, node, utxos=None):
# generate a 66k transaction,
# and 14 of them is close to the 1MB block limit
num = 14
utxos = utxos if utxos is not None else []
if len(utxos) < num:
utxos.clear()
utxos.extend(node.listunspent())
send_big_transactions(node, utxos, num, 100)
test_framework.generate(node, 1)
def send_big_transactions(node, utxos, num, fee_multiplier):
from .cashaddr import decode
txids = []
padding = "1" * 512
addrHash = decode(node.getnewaddress())[2]
for _ in range(num):
ctx = CTransaction()
utxo = utxos.pop()
txid = int(utxo['txid'], 16)
ctx.vin.append(CTxIn(COutPoint(txid, int(utxo["vout"])), b""))
ctx.vout.append(
CTxOut(int(satoshi_round(utxo['amount'] * COIN)),
CScript([OP_DUP, OP_HASH160, addrHash, OP_EQUALVERIFY, OP_CHECKSIG])))
for i in range(0, 127):
ctx.vout.append(CTxOut(0, CScript(
[OP_RETURN, bytes(padding, 'utf-8')])))
# Create a proper fee for the transaction to be mined
ctx.vout[0].nValue -= int(fee_multiplier * node.calculate_fee(ctx))
signresult = node.signrawtransactionwithwallet(
ToHex(ctx), None, "NONE|FORKID")
txid = node.sendrawtransaction(signresult["hex"], True)
txids.append(txid)
return txids