-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathblock2rdf-cli.py
294 lines (243 loc) · 13.5 KB
/
block2rdf-cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
#! /usr/bin/env python3
"""block2rdf-cli is the third version of the adapted blocknotify script.
TODO:
- Revisar modo "test".
- "Exit gracefully" cuando el cliente Slimcoin no esté corriendo - sino el script sigue corriendo y hay que "matarlo".
"""
from rdflib import RDF, URIRef, Graph
from blocknotifybase import BlockChainProcessor, RDFChainProcessor, mainnet, testnet, datadir
import sys
import subprocess
import os
import json
import argparse
import time
global blockhash
class ChainCatchup(BlockChainProcessor, RDFChainProcessor):
"""Inherits the two main classes from blocknotifybase."""
def __init__(self, network, dataset, store_offset, check_reorgs):
self.dataset = dataset
self.store_offset, self.check_reorgs = store_offset, check_reorgs # flags
BlockChainProcessor.__init__(self, network)
def store_blockheightfile(self, hdic):
"""Writes a JSON file that contains the last blockheight checked for OP_RETURN or burn transactions.
The file contains also the height of the last block stored in the RDF dataset."""
with open(datadir + '/{s}.height.txt'.format(s=self.dataset), 'w') as fp:
json.dump(hdic, fp)
def load_blockheightfile(self):
try:
with open(datadir + '/{s}.height.txt'.format(s=self.dataset), 'r') as fp:
return json.load(fp)
except FileNotFoundError:
if verbose: print("No blockheight file found. Using current RDF chain block height.")
return None
def next_blockheight(self):
"""Returns the height of the block after the last checked/stored block, regardless of mode."""
if self.store_offset == True:
bhf_hdict = self.load_blockheightfile()
try:
return bhf_hdict["checked_height"] + 1
except KeyError:
if verbose: print("Blockheight file not found or corrupted. Starting from chain graph height.")
cg_hdict = self.get_chaingraph_height(self.dataset)
if cg_hdict is None:
return 0 # new chain
else:
return cg_hdict.get("rdf_height") + 1
def exit(self, lastheight):
if verbose or self.store_offset:
cg_hdict = self.get_chaingraph_height(self.dataset)
if self.store_offset == True:
cbhash = self.getblockhash(lastheight)
cg_hdict.update({"checked_height":lastheight, "checked_blockhash":cbhash})
self.store_blockheightfile(cg_hdict)
if verbose: print("Last checked block:", cbhash, "at height", lastheight)
if verbose: print("Final stored block:", cg_hdict.get("rdf_blockhash"), "at height", cg_hdict.get("rdf_height"))
if verbose: print("OK")
if log:
self.log_block(lastheight, self.getblockhash(lastheight), "EXITING")
sys.exit()
def log_block(self, blockheight, blockhash, message=""):
with open(datadir + '/{s}.log'.format(s=self.dataset), 'a') as fp:
fp.write("{} {} {}\n".format(blockheight, blockhash, message))
def catchup_main(self, startblock, maxblocks, maxheight, mode="mainnet"):
if verbose:
interval = 1
else:
interval = 100
self.setUpGraph() # every loop instance needs a new graph
# client_prevhash = None
endblock = min(startblock + maxblocks, maxheight)
interrupt = False
if self.check_reorgs:
# checks if RDF graph and client are on the same chain.
# If not, last block is deleted, and the previous block is checked.
# The loop then continues with the last common blockheight.
blockheight = self.reorgcheck(startblock)
else:
blockheight = startblock
while blockheight < endblock:
try:
blockhash = self.getblockhash(blockheight)
if blockheight % interval == 0:
print(blockheight)
self.readblock(blockhash, mode) # calls main routine of b2rbase
if log:
self.log_block(blockheight, blockhash)
prev_blockhash = blockhash
blockheight += 1
except KeyboardInterrupt:
if verbose: print("KeyboardInterrupt: Stopping.")
interrupt = True
break
except Exception as e:
if verbose: print("Exception:", e, ". Exiting")
interrupt = True
break
return blockheight, interrupt
def reorgcheck(self, height):
"""Checks if RDF graph and client are on the same chain.
If not, last block is deleted from the RDF graph, and the previous block is checked.
After processing the first correct block the script catches up to the original "maxheight" value."""
reorg = False
if height == 0: # new chain start
return 0
if self.store_offset:
# compares the content of blockheight file with the block hash of the same height.
# As we have no information of what occurred between last stored block and the last checked block,
# if there is a reorg between both, the program must continue at the stored height
# If the reorg is deeper than the stored height, the program must check
# all stored blocks until the hashes are consistent
# so it returns to the standard reorgcheck loop.
bhf = self.load_blockheightfile()
if bhf.get("checked_blockhash") == self.getblockhash(bhf.get("checked_height")):
# checked block is ok: blockheight stays the same
return height
elif bhf.get("rdf_blockhash") == self.getblockhash(bhf.get("rdf_height")):
# last stored block is ok: stored block set as blockheight, blockheightfile updated
if log:
self.log_block(bhf.get("checked_height"), bhf.get("checked_blockhash"), "Reorg detected. Rolling back to last stored block.")
bhf.update({"checked_height":bhf.get("rdf_height"), "checked_blockhash":bhf.get("rdf_blockhash")})
self.store_blockheightfile(bhf)
return bhf.get("rdf_height") + 1
while True:
rdf_lastblock = self.get_chaingraph_height(self.dataset)
chaingraph_height = rdf_lastblock.get("rdf_height")
chaingraph_bhash = rdf_lastblock.get("rdf_blockhash")
bestchain_bhash = self.getblockhash(rdf_lastblock.get("rdf_height")) # current best chain as viewed by the client
if bestchain_bhash != chaingraph_bhash: # reorg
reorg = True
if verbose: print("RDF blockchain is on a fork:", chaingraph_bhash, "correct block:", bestchain_bhash)
if log:
if self.store_offset:
message = "Reorg detected. Rolling back to next stored block."
else:
message = "Reorg detected. Rolling back one block."
self.log_block(chaingraph_height, chaingraph_bhash, message)
lastblockgraph = self.get_chaingraph_block(chaingraph_bhash, self.dataset)
if verbose: print("Blockgraph to be DISCARDED:", lastblockgraph.serialize(format="n3").decode("utf-8"))
status = self.delete_chaingraph_block(chaingraph_bhash, self.dataset)
if verbose: print("Graph update operation delivered status:", status)
bestchain_bhash = None
time.sleep(10) # minimize database load in long reorgs
continue
else:
if reorg == True:
return chaingraph_height + 1
else:
return height
def catchup_loop(self, offset, loops, maxblocks, maxheight, sleep, mode="mainnet"):
startblock = offset
for i in range(loops):
if verbose: print("Starting loop at height", startblock)
print(startblock, maxblocks, maxheight, mode)
blockheight, interrupt = self.catchup_main(startblock, maxblocks, maxheight, mode)
if len(self.g) == 0:
if verbose: print("No data to be added. Continuing.")
else:
if verbose: print("Transferring data ...")
self.change_chaingraph(self.dataset)
if verbose: print("Sleeping for {} seconds.".format(str(sleep)))
time.sleep(sleep)
if interrupt == True: # KeyboardInterrupt
break
startblock = startblock + maxblocks
if startblock > maxheight:
break
self.exit(blockheight - 1) # last checked block is one less than current height
def main():
global test
global debug
global verbose
global log
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument("-l", "--loops", type=int, default=1, help="Loop (catchup) mode; loop the main cycle X times (Default: 1).")
group.add_argument("-b", "--blockhash", type=str, help="Blockhash mode; adds only the block with the given blockhash. To be triggered by blocknotify.")
parser.add_argument("-m", "--maxblocks", type=int, default=100, help="Number of blocks in each cycle (default: 100).")
parser.add_argument("-s", "--sleep", type=int, default=2, help="Time (in seconds) to sleep between each cycle (default: 2).")
parser.add_argument("-o", "--offset", type=int, help="Block-height offset.")
parser.add_argument("-O", "--storeoffset", action="store_true", help="Stored offset mode: retrieves offset value from file.")
parser.add_argument("-t", "--test", action="store_true", help="Test mode. Does not transfer data to Fuseki, but prints one cycle to stdout.") # implement for delete
parser.add_argument("-d", "--debug", action="store_true", help="Debug mode.")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose mode.")
parser.add_argument("-L", "--log", action="store_true", help="Log block output.")
parser.add_argument("-r", "--reorgcheck", action="store_true", help="Checks for reorgs (more expensive).")
parser.add_argument("-R", "--repair", type=int, help="Repair mode. Begins X blocks before the current RDF blockchain height, to prevent incomplete blocks (e.g. if the script was interrupted).")
parser.add_argument("-S", "--safe", type=int, const=100, nargs="?", help="Safe mode. Catches up to X blocks before the current real blockchain height (X defaults to 100, can be increased if there is reorg danger).", dest="blocklimit")
parser.add_argument("-P", "--only-publications", help="Publication mode. Only records blocks that contain OP_RETURN transactions.", action="store_const", dest="mode", const="pub")
parser.add_argument("-B", "--only-burn-address", help="Burn address mode. Only records blocks which affect the balance of the burn address.", action="store_const", dest="mode", const="burn")
parser.add_argument("-T", "--testnet", help="Testnet mode.", action="store_const", dest="mode", const="testnet")
args = parser.parse_args()
test, debug, verbose, log = args.test, args.debug, args.verbose, args.log
blockhash = args.blockhash
if args.mode is None:
args.mode = "mainnet"
dataset = mainnet.get('symbol').lower()
network = mainnet
elif args.mode == "testnet":
dataset = testnet.get('symbol').lower()
network = testnet
else:
dataset = args.mode + "_" + mainnet.get('symbol').lower()
network = mainnet
if args.mode in ("pub","burn"):
args.storeoffset = True
if blockhash is not None:
# Blocknotify mode.
c = ChainCatchup(network, dataset, args.storeoffset, True)
if verbose: print("Notify mode. Adding block:", blockhash)
maxheight = c.getblockdata(blockhash).get("height", 0)
next_blockheight = c.next_blockheight()
# check if there are blocks missing since last run, if yes, catch up.
if maxheight > next_blockheight:
args.offset = next_blockheight
args.maxblocks = maxheight - args.offset
if args.maxblocks > 100: # large catchups: loop mode
args.loops = int(args.maxblocks / 100) + 1
args.maxblocks = 100
else:
args.offset = maxheight
else:
c = ChainCatchup(network, dataset, args.storeoffset, args.reorgcheck)
next_blockheight = c.next_blockheight()
if args.offset is None:
if args.repair is not None:
args.offset = next_blockheight - args.repair
else:
args.offset = next_blockheight
if args.blocklimit is not None:
maxheight = c.binfo.get('blocks') - args.blocklimit
else:
maxheight = c.binfo.get('blocks') + 1
if maxheight < args.offset:
raise ValueError("Endblock has lower height than starting block.")
if verbose: print("End height block: {}".format(maxheight))
if verbose: print("Offset: {} Maxblocks: {}".format(args.offset, args.maxblocks))
if args.offset == 0:
c.start_new_chain()
print(maxheight, next_blockheight, args.offset, args.maxblocks, args.loops)
print(args.offset, args.loops, args.maxblocks, maxheight, args.sleep, args.mode)
c.catchup_loop(args.offset, args.loops, args.maxblocks, maxheight, args.sleep, args.mode)
if __name__ == "__main__":
main()