Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jansegre committed Apr 26, 2022
1 parent f0f8b2a commit f13f2b1
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 103 deletions.
35 changes: 19 additions & 16 deletions hathor/indexes/rocksdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,25 @@ def _fresh_cf(self, cf_name: bytes) -> 'rocksdb.ColumnFamilyHandle':

log_cf = self.log.new(cf=cf_name.decode('ascii'))
_cf = self._db.get_column_family(cf_name)
# XXX: dropping column because initialization currently expects a fresh index
if _cf is not None:
old_id = _cf.id
log_cf.debug('drop existing column family')
self._db.drop_column_family(_cf)
else:
old_id = None
log_cf.debug('no need to drop column family')
del _cf
log_cf.debug('create fresh column family')
_cf = self._db.create_column_family(cf_name, rocksdb.ColumnFamilyOptions())
new_id = _cf.id
assert _cf is not None
assert _cf.is_valid
assert new_id != old_id
log_cf.debug('got column family', is_valid=_cf.is_valid, id=_cf.id, old_id=old_id)
# # XXX: dropping column because initialization currently expects a fresh index
# if _cf is not None:
# old_id = _cf.id
# log_cf.debug('drop existing column family')
# self._db.drop_column_family(_cf)
# else:
# old_id = None
# log_cf.debug('no need to drop column family')
# del _cf
# log_cf.debug('create fresh column family')
# _cf = self._db.create_column_family(cf_name, rocksdb.ColumnFamilyOptions())
# new_id = _cf.id
# assert _cf is not None
# assert _cf.is_valid
# assert new_id != old_id
# log_cf.debug('got column family', is_valid=_cf.is_valid, id=_cf.id, old_id=old_id)
if _cf is None:
_cf = self._db.create_column_family(cf_name, rocksdb.ColumnFamilyOptions())
log_cf.debug('got column family', is_valid=_cf.is_valid, id=_cf.id)
return _cf


Expand Down
2 changes: 1 addition & 1 deletion hathor/indexes/tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TipsIndex:
def __init__(self) -> None:
self.log = logger.new()
self.tree = IntervalTree()
self.tx_last_interval = {} # Dict[bytes(hash), Interval]
self.tx_last_interval = {}

def add_tx(self, tx: BaseTransaction) -> bool:
""" Add a new transaction to the index
Expand Down
173 changes: 87 additions & 86 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction, Block, MergeMinedBlock, Transaction, TxVersion, sum_weights
from hathor.transaction.exceptions import TxValidationError
# from hathor.transaction.exceptions import TxValidationError
from hathor.transaction.storage import TransactionStorage
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.util import LogDuration, Random, Reactor
Expand Down Expand Up @@ -338,9 +338,9 @@ def _initialize_components(self) -> None:
if self.wallet:
self.wallet._manually_initialize()
t0 = time.time()
t1 = t0
# t1 = t0
cnt = 0
cnt2 = 0
# cnt2 = 0
t2 = t0
h = 0

Expand Down Expand Up @@ -385,88 +385,89 @@ def _initialize_components(self) -> None:
for tx in self.tx_storage.get_all_transactions():
tx.reset_metadata()

self.log.debug('load blocks and transactions')
for tx in self.tx_storage._topological_sort():
if self._full_verification:
tx.update_initial_metadata()

assert tx.hash is not None

tx_meta = tx.get_metadata()

t2 = time.time()
dt = LogDuration(t2 - t1)
dcnt = cnt - cnt2
tx_rate = '?' if dt == 0 else dcnt / dt
h = max(h, tx_meta.height)
if dt > 30:
ts_date = datetime.datetime.fromtimestamp(self.tx_storage.latest_timestamp)
if h == 0:
self.log.debug('start loading transactions...')
else:
self.log.info('load transactions...', tx_rate=tx_rate, tx_new=dcnt, dt=dt,
total=cnt, latest_ts=ts_date, height=h)
t1 = t2
cnt2 = cnt
cnt += 1

# It's safe to skip block weight verification during initialization because
# we trust the difficulty stored in metadata
skip_block_weight_verification = True
if block_count % settings.VERIFY_WEIGHT_EVERY_N_BLOCKS == 0:
skip_block_weight_verification = False

try:
if self._full_verification:
# TODO: deal with invalid tx
if tx.can_validate_full():
self.tx_storage.add_to_indexes(tx)
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
self.consensus_algorithm.update(tx)
self.tx_storage.indexes.mempool_tips.update(tx)
self.step_validations([tx])
else:
assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
self.tx_storage.save_transaction(tx, only_metadata=True)
else:
# TODO: deal with invalid tx
if not tx_meta.validation.is_final():
if not tx_meta.validation.is_checkpoint():
assert tx_meta.validation.is_at_least_basic(), f'invalid: {tx.hash_hex}'
elif tx.is_transaction and tx_meta.first_block is None and not tx_meta.voided_by:
assert self.tx_storage.indexes is not None
self.tx_storage.indexes.mempool_tips.update(tx)
self.tx_storage.add_to_indexes(tx)
if tx.is_transaction and tx_meta.voided_by:
self.tx_storage.del_from_indexes(tx)
except (InvalidNewTransaction, TxValidationError):
self.log.error('unexpected error when initializing', tx=tx, exc_info=True)
raise

if tx.is_block:
block_count += 1

# this works because blocks on the best chain are iterated from lower to higher height
assert tx.hash is not None
assert tx_meta.validation.is_at_least_basic()
if not tx_meta.voided_by and tx_meta.validation.is_fully_connected():
# XXX: this might not be needed when making a full init because the consensus should already have
self.tx_storage.indexes.height.add_reorg(tx_meta.height, tx.hash, tx.timestamp)

# Check if it's a checkpoint block
if tx_meta.height in checkpoint_heights:
if tx.hash == checkpoint_heights[tx_meta.height]:
del checkpoint_heights[tx_meta.height]
else:
# If the hash is different from checkpoint hash, we stop the node
self.log.error('Error initializing the node. Checkpoint validation error.')
sys.exit()
else:
tx_count += 1

if time.time() - t2 > 1:
dt = LogDuration(time.time() - t2)
self.log.warn('tx took too long to load', tx=tx.hash_hex, dt=dt)
self.tx_storage.indexes._manually_initialize_tips_indexes(self.tx_storage)
# self.log.debug('load blocks and transactions')
# for tx in self.tx_storage._topological_sort():
# if self._full_verification:
# tx.update_initial_metadata()

# assert tx.hash is not None

# tx_meta = tx.get_metadata()

# t2 = time.time()
# dt = LogDuration(t2 - t1)
# dcnt = cnt - cnt2
# tx_rate = '?' if dt == 0 else dcnt / dt
# h = max(h, tx_meta.height)
# if dt > 30:
# ts_date = datetime.datetime.fromtimestamp(self.tx_storage.latest_timestamp)
# if h == 0:
# self.log.debug('start loading transactions...')
# else:
# self.log.info('load transactions...', tx_rate=tx_rate, tx_new=dcnt, dt=dt,
# total=cnt, latest_ts=ts_date, height=h)
# t1 = t2
# cnt2 = cnt
# cnt += 1

# # It's safe to skip block weight verification during initialization because
# # we trust the difficulty stored in metadata
# skip_block_weight_verification = True
# if block_count % settings.VERIFY_WEIGHT_EVERY_N_BLOCKS == 0:
# skip_block_weight_verification = False

# try:
# if self._full_verification:
# # TODO: deal with invalid tx
# if tx.can_validate_full():
# self.tx_storage.add_to_indexes(tx)
# assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
# self.consensus_algorithm.update(tx)
# self.tx_storage.indexes.mempool_tips.update(tx)
# self.step_validations([tx])
# else:
# assert tx.validate_basic(skip_block_weight_verification=skip_block_weight_verification)
# self.tx_storage.save_transaction(tx, only_metadata=True)
# else:
# # TODO: deal with invalid tx
# if not tx_meta.validation.is_final():
# if not tx_meta.validation.is_checkpoint():
# assert tx_meta.validation.is_at_least_basic(), f'invalid: {tx.hash_hex}'
# elif tx.is_transaction and tx_meta.first_block is None and not tx_meta.voided_by:
# assert self.tx_storage.indexes is not None
# self.tx_storage.indexes.mempool_tips.update(tx)
# self.tx_storage.add_to_indexes(tx)
# if tx.is_transaction and tx_meta.voided_by:
# self.tx_storage.del_from_indexes(tx)
# except (InvalidNewTransaction, TxValidationError):
# self.log.error('unexpected error when initializing', tx=tx, exc_info=True)
# raise

# if tx.is_block:
# block_count += 1

# # this works because blocks on the best chain are iterated from lower to higher height
# assert tx.hash is not None
# assert tx_meta.validation.is_at_least_basic()
# if not tx_meta.voided_by and tx_meta.validation.is_fully_connected():
# # XXX: this might not be needed when making a full init because the consensus should already have
# self.tx_storage.indexes.height.add_reorg(tx_meta.height, tx.hash, tx.timestamp)

# # Check if it's a checkpoint block
# if tx_meta.height in checkpoint_heights:
# if tx.hash == checkpoint_heights[tx_meta.height]:
# del checkpoint_heights[tx_meta.height]
# else:
# # If the hash is different from checkpoint hash, we stop the node
# self.log.error('Error initializing the node. Checkpoint validation error.')
# sys.exit()
# else:
# tx_count += 1

# if time.time() - t2 > 1:
# dt = LogDuration(time.time() - t2)
# self.log.warn('tx took too long to load', tx=tx.hash_hex, dt=dt)

# we have to have a best_block by now
# assert best_block is not None
Expand All @@ -475,7 +476,7 @@ def _initialize_components(self) -> None:

# Check if all checkpoints in database are ok
my_best_height = self.tx_storage.get_height_best_block()
if checkpoint_heights:
if checkpoint_heights and False:
# If I have checkpoints that were not validated I must check if they are all in a height I still don't have
first = min(list(checkpoint_heights.keys()))
if first <= my_best_height:
Expand Down

0 comments on commit f13f2b1

Please sign in to comment.