Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch Release: v1.4.1 #179

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ jobs:
- name: Run tests
run: |
pip install pytest
pytest --cov=electrumx --ignore=tests/test_blocks.py
pytest --cov=electrumx --ignore=tests/test_blocks.py --ignore=tests/server/test_compaction.py
76 changes: 41 additions & 35 deletions electrumx/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
LOCATION_ID_LEN = 36
TX_OUTPUT_IDX_LEN = 4


class Prefetcher:
'''Prefetches blocks (in the forward direction only).'''

Expand Down Expand Up @@ -248,6 +249,7 @@ def __init__(self, env: 'Env', db: DB, daemon: Daemon, notifications: 'Notificat
# Meta
self.next_cache_check = 0
self.touched = set()
self.semaphore = asyncio.Semaphore()
self.reorg_count = 0
self.height = -1
self.tip = None # type: Optional[bytes]
Expand Down Expand Up @@ -339,6 +341,7 @@ async def check_and_advance_blocks(self, raw_blocks):
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
elif hprevs[0] != chain[0]:
self.logger.info(f'check_and_advance_blocks reorg: {first}')
await self.reorg_chain()
else:
# It is probably possible but extremely rare that what
Expand All @@ -351,42 +354,44 @@ async def check_and_advance_blocks(self, raw_blocks):
await self.prefetcher.reset_height(self.height)

async def reorg_chain(self, count=None):
'''Handle a chain reorganisation.

Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
if count is None:
self.logger.info('chain reorg detected')
else:
self.logger.info(f'faking a reorg of {count:,d} blocks')
await self.flush(True)
# Use Semaphore to ensure only one reorg signal was held.
async with self.semaphore:
'''Handle a chain reorganisation.

Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
if count is None:
self.logger.info('chain reorg detected')
else:
self.logger.info(f'faking a reorg of {count:,d} blocks')
await self.flush(True)

async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]:
heights = range(last_height, last_height - len(hex_hashes), -1)
try:
blocks = [self.db.read_raw_block(height) for height in heights]
self.logger.info(f'read {len(blocks)} blocks from disk')
return blocks
except FileNotFoundError:
return await self.daemon.raw_blocks(hex_hashes)

def flush_backup():
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)

_start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.prefetcher.reset_height(self.height)
self.backed_up_event.set()
self.backed_up_event.clear()
async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]:
heights = range(last_height, last_height - len(hex_hashes), -1)
try:
blocks = [self.db.read_raw_block(height) for height in heights]
self.logger.info(f'read {len(blocks)} blocks from disk')
return blocks
except FileNotFoundError:
return await self.daemon.raw_blocks(hex_hashes)

def flush_backup():
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)

_start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.prefetcher.reset_height(self.height)
self.backed_up_event.set()
self.backed_up_event.clear()

async def reorg_hashes(self, count):
'''Return a pair (start, last, hashes) of blocks to back up during a
Expand Down Expand Up @@ -3556,6 +3561,7 @@ async def _process_prefetched_blocks(self):
await self.blocks_event.wait()
self.blocks_event.clear()
if self.reorg_count:
self.logger.info(f'_process_prefetched_blocks reorg: {self.reorg_count}')
await self.reorg_chain(self.reorg_count)
self.reorg_count = 0
else:
Expand Down
4 changes: 2 additions & 2 deletions electrumx/server/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import electrumx.lib.util as util
from electrumx.lib.hash import HASHX_LEN, hash_to_hex_str
from electrumx.lib.util import (pack_be_uint16, pack_le_uint64,
from electrumx.lib.util import (pack_be_uint16, pack_be_uint32, pack_le_uint64,
unpack_be_uint16_from, unpack_le_uint64)

if TYPE_CHECKING:
Expand Down Expand Up @@ -157,7 +157,7 @@ def assert_flushed(self):
def flush(self):
start_time = time.monotonic()
self.flush_count += 1
flush_id = pack_be_uint16(self.flush_count)
flush_id = pack_be_uint32(self.flush_count)
unflushed = self.unflushed

with self.db.write_batch() as batch:
Expand Down
10 changes: 8 additions & 2 deletions electrumx/server/http_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,14 @@ def success_resp(data) -> web.Response:

def request_middleware(self) -> web_middlewares:
async def factory(app: web.Application, handler):
async def middleware_handler(request):
self.logger.info('Request {} comming'.format(request))
async def middleware_handler(request: web.Request):
# Log request details as a future.
async def log_request():
method = request.path
params = await request.json() if request.content_length else None
self.logger.debug(f'HTTP request handling: [method] {method}, [params]: {params}')
asyncio.ensure_future(log_request())

if not self.env.enable_rate_limit:
response = await handler(request)
if isinstance(response, web.Response):
Expand Down
20 changes: 13 additions & 7 deletions electrumx/server/http_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,15 @@ def __init__(self, session_mgr, db, mempool, peer_mgr, kind):
self.MAX_CHUNK_SIZE = 2016
self.hashX_subs = {}

async def format_params(self, request):
async def format_params(self, request: web.Request):
params: list
if request.method == "GET":
params = json.loads(request.query.get("params", "[]"))
else:
elif request.content_length:
json_data = await request.json()
params = json_data.get("params", [])
else:
params = []
return dict(zip(range(len(params)), params))

async def get_rpc_server(self):
Expand Down Expand Up @@ -860,11 +863,8 @@ async def transaction_broadcast(self, request):
# verified
async def scripthash_get_history(self, request):
'''Return the confirmed and unconfirmed history of a scripthash.'''
try:
params = await self.format_params(request)
scripthash = params.get(0, "")
except Exception as e:
scripthash = request
params = await self.format_params(request)
scripthash = params.get(0)

hashX = scripthash_to_hashX(scripthash)
return await self.confirmed_and_unconfirmed_history(hashX)
Expand Down Expand Up @@ -1166,6 +1166,12 @@ async def atomicals_num_to_id(self, request):
atomicals_num_to_id_map_reformatted[num] = location_id_bytes_to_compact(id)
return {'global': await self.get_summary_info(), 'result': atomicals_num_to_id_map_reformatted }

async def atomicals_block_hash(self, request):
params = await self.format_params(request)
height = params.get(0, self.session_mgr.bp.height)
block_hash = self.db.get_atomicals_block_hash(height)
return {'result': block_hash}

async def atomicals_block_txs(self, request):
params = await self.format_params(request)
height = params.get(0, "")
Expand Down
19 changes: 16 additions & 3 deletions electrumx/server/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ async def _start_servers(self, services):
app.router.add_get('/proxy/blockchain.atomicals.listscripthash', handler.atomicals_listscripthash)
app.router.add_get('/proxy/blockchain.atomicals.list', handler.atomicals_list)
app.router.add_get('/proxy/blockchain.atomicals.get_numbers', handler.atomicals_num_to_id)
app.router.add_get('/proxy/blockchain.atomicals.get_block_hash', handler.atomicals_block_hash)
app.router.add_get('/proxy/blockchain.atomicals.get_block_txs', handler.atomicals_block_txs)
app.router.add_get('/proxy/blockchain.atomicals.dump', handler.atomicals_dump)
app.router.add_get('/proxy/blockchain.atomicals.at_location', handler.atomicals_at_location)
Expand Down Expand Up @@ -334,6 +335,7 @@ async def _start_servers(self, services):
app.router.add_post('/proxy/blockchain.atomicals.listscripthash', handler.atomicals_listscripthash)
app.router.add_post('/proxy/blockchain.atomicals.list', handler.atomicals_list)
app.router.add_post('/proxy/blockchain.atomicals.get_numbers', handler.atomicals_num_to_id)
app.router.add_post('/proxy/blockchain.atomicals.get_block_hash', handler.atomicals_block_hash)
app.router.add_post('/proxy/blockchain.atomicals.get_block_txs', handler.atomicals_block_txs)
app.router.add_post('/proxy/blockchain.atomicals.dump', handler.atomicals_dump)
app.router.add_post('/proxy/blockchain.atomicals.at_location', handler.atomicals_at_location)
Expand Down Expand Up @@ -1191,14 +1193,18 @@ def sub_count(self):
return 0

async def handle_request(self, request):
'''Handle an incoming request. ElectrumX doesn't receive
"""Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions.
'''
"""
if isinstance(request, Request):
handler = self.request_handlers.get(request.method)
method = request.method
args = request.args
else:
handler = None
method = 'invalid method' if handler is None else request.method
method = 'invalid method'
args = None
self.logger.debug(f'Session request handling: [method] {method}, [args] {args}')

# If DROP_CLIENT_UNKNOWN is enabled, check if the client identified
# by calling server.version previously. If not, disconnect the session
Expand Down Expand Up @@ -1544,6 +1550,12 @@ async def atomicals_num_to_id(self, limit, offset, asc):
atomicals_num_to_id_map_reformatted[num] = location_id_bytes_to_compact(id)
return {'global': await self.get_summary_info(), 'result': atomicals_num_to_id_map_reformatted }

async def atomicals_block_hash(self, height):
if not height:
height = self.session_mgr.bp.height
block_hash = self.db.get_atomicals_block_hash(height)
return {'result': block_hash}

async def atomicals_block_txs(self, height):
tx_list = self.session_mgr.bp.get_atomicals_block_txs(height)
return {'global': await self.get_summary_info(), 'result': tx_list }
Expand Down Expand Up @@ -3102,6 +3114,7 @@ def set_request_handlers(self, ptuple):
'blockchain.atomicals.listscripthash': self.atomicals_listscripthash,
'blockchain.atomicals.list': self.atomicals_list,
'blockchain.atomicals.get_numbers': self.atomicals_num_to_id,
'blockchain.atomicals.get_block_hash': self.atomicals_block_hash,
'blockchain.atomicals.get_block_txs': self.atomicals_block_txs,
'blockchain.atomicals.dump': self.atomicals_dump,
'blockchain.atomicals.at_location': self.atomicals_at_location,
Expand Down
2 changes: 1 addition & 1 deletion electrumx_server
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ load_dotenv()
def main():
'''Set up logging and run the server.'''
log_fmt = Env.default('LOG_FORMAT', '%(levelname)s:%(name)s:%(message)s')
log_level = Env.default('LOG_LEVEL', 'INFO')
log_level = Env.default('LOG_LEVEL', 'INFO').upper()
log_path = Env.default('LOG_PATH', None)
if log_path:
exist =os.path.exists(log_path)
Expand Down
6 changes: 3 additions & 3 deletions tests/server/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from electrumx.lib.hash import HASHX_LEN
from electrumx.lib.util import pack_be_uint16, pack_le_uint64
from electrumx.lib.util import pack_be_uint16, pack_be_uint32, pack_le_uint64
from electrumx.server.env import Env
from electrumx.server.db import DB

Expand Down Expand Up @@ -49,7 +49,7 @@ def check_hashX_compaction(history):
hist_list = []
hist_map = {}
for flush_count, count in pairs:
key = hashX + pack_be_uint16(flush_count)
key = hashX + pack_be_uint32(flush_count)
hist = full_hist[cum * 5: (cum+count) * 5]
hist_map[key] = hist
hist_list.append(hist)
Expand All @@ -68,7 +68,7 @@ def check_hashX_compaction(history):
assert item == (hashX + pack_be_uint16(n),
full_hist[n * row_size: (n + 1) * row_size])
for flush_count, count in pairs:
assert hashX + pack_be_uint16(flush_count) in keys_to_delete
assert hashX + pack_be_uint32(flush_count) in keys_to_delete

# Check re-compaction is null
hist_map = {key: value for key, value in write_items}
Expand Down
Loading