From f37d361bfd7859dfd30675c054b0f2b8d0dd2f62 Mon Sep 17 00:00:00 2001 From: "0xLucky2077.eth" Date: Sat, 27 Apr 2024 21:56:04 +0800 Subject: [PATCH 1/6] fix: LOG_LEVEL env breakchange (#173) --- electrumx_server | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/electrumx_server b/electrumx_server index 503391ff..1098cc61 100755 --- a/electrumx_server +++ b/electrumx_server @@ -25,7 +25,7 @@ load_dotenv() def main(): '''Set up logging and run the server.''' log_fmt = Env.default('LOG_FORMAT', '%(levelname)s:%(name)s:%(message)s') - log_level = Env.default('LOG_LEVEL', 'INFO') + log_level = Env.default('LOG_LEVEL', 'INFO').upper() log_path = Env.default('LOG_PATH', None) if log_path: exist =os.path.exists(log_path) From 39d0c3a33ffe6a6b15c791184f0070133ed8291e Mon Sep 17 00:00:00 2001 From: Alex Li Date: Mon, 29 Apr 2024 11:42:20 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E2=9C=A8=20`blockchain.atomicals.get=5Fblo?= =?UTF-8?q?ck=5Fhash`=20(#174)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- electrumx/server/http_session.py | 6 ++++++ electrumx/server/session.py | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/electrumx/server/http_session.py b/electrumx/server/http_session.py index 00f6688c..f4703170 100644 --- a/electrumx/server/http_session.py +++ b/electrumx/server/http_session.py @@ -1166,6 +1166,12 @@ async def atomicals_num_to_id(self, request): atomicals_num_to_id_map_reformatted[num] = location_id_bytes_to_compact(id) return {'global': await self.get_summary_info(), 'result': atomicals_num_to_id_map_reformatted } + async def atomicals_block_hash(self, request): + params = await self.format_params(request) + height = params.get(0, self.session_mgr.bp.height) + block_hash = self.db.get_atomicals_block_hash(height) + return {'result': block_hash} + async def atomicals_block_txs(self, request): params = await self.format_params(request) height = params.get(0, "") diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 38af11e3..c7519e6b 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -274,6 +274,7 @@ async def _start_servers(self, services): app.router.add_get('/proxy/blockchain.atomicals.listscripthash', handler.atomicals_listscripthash) app.router.add_get('/proxy/blockchain.atomicals.list', handler.atomicals_list) app.router.add_get('/proxy/blockchain.atomicals.get_numbers', handler.atomicals_num_to_id) + app.router.add_get('/proxy/blockchain.atomicals.get_block_hash', handler.atomicals_block_hash) app.router.add_get('/proxy/blockchain.atomicals.get_block_txs', handler.atomicals_block_txs) app.router.add_get('/proxy/blockchain.atomicals.dump', handler.atomicals_dump) app.router.add_get('/proxy/blockchain.atomicals.at_location', handler.atomicals_at_location) @@ -334,6 +335,7 @@ async def _start_servers(self, services): app.router.add_post('/proxy/blockchain.atomicals.listscripthash', handler.atomicals_listscripthash) app.router.add_post('/proxy/blockchain.atomicals.list', handler.atomicals_list) app.router.add_post('/proxy/blockchain.atomicals.get_numbers', handler.atomicals_num_to_id) + app.router.add_post('/proxy/blockchain.atomicals.get_block_hash', handler.atomicals_block_hash) app.router.add_post('/proxy/blockchain.atomicals.get_block_txs', handler.atomicals_block_txs) app.router.add_post('/proxy/blockchain.atomicals.dump', handler.atomicals_dump) app.router.add_post('/proxy/blockchain.atomicals.at_location', handler.atomicals_at_location) @@ -1544,6 +1546,12 @@ async def atomicals_num_to_id(self, limit, offset, asc): atomicals_num_to_id_map_reformatted[num] = location_id_bytes_to_compact(id) return {'global': await self.get_summary_info(), 'result': atomicals_num_to_id_map_reformatted } + async def atomicals_block_hash(self, height): + if not height: + height = self.session_mgr.bp.height + block_hash = self.db.get_atomicals_block_hash(height) + return {'result': block_hash} + async def atomicals_block_txs(self, height): tx_list = self.session_mgr.bp.get_atomicals_block_txs(height) return {'global': await self.get_summary_info(), 'result': tx_list } @@ -3102,6 +3110,7 @@ def set_request_handlers(self, ptuple): 'blockchain.atomicals.listscripthash': self.atomicals_listscripthash, 'blockchain.atomicals.list': self.atomicals_list, 'blockchain.atomicals.get_numbers': self.atomicals_num_to_id, + 'blockchain.atomicals.get_block_hash': self.atomicals_block_hash, 'blockchain.atomicals.get_block_txs': self.atomicals_block_txs, 'blockchain.atomicals.dump': self.atomicals_dump, 'blockchain.atomicals.at_location': self.atomicals_at_location, From 9e98ac859d04fa6053f4ea34fd3d1738f3f59e7f Mon Sep 17 00:00:00 2001 From: Alex Li Date: Mon, 29 Apr 2024 11:42:39 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=F0=9F=90=9B=20Do=20not=20parse=20empty=20H?= =?UTF-8?q?TTP=20payload=20(#175)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- electrumx/server/http_session.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/electrumx/server/http_session.py b/electrumx/server/http_session.py index f4703170..d40603e5 100644 --- a/electrumx/server/http_session.py +++ b/electrumx/server/http_session.py @@ -113,12 +113,15 @@ def __init__(self, session_mgr, db, mempool, peer_mgr, kind): self.MAX_CHUNK_SIZE = 2016 self.hashX_subs = {} - async def format_params(self, request): + async def format_params(self, request: web.Request): + params: list if request.method == "GET": params = json.loads(request.query.get("params", "[]")) - else: + elif request.content_length: json_data = await request.json() params = json_data.get("params", []) + else: + params = [] return dict(zip(range(len(params)), params)) async def get_rpc_server(self): @@ -860,11 +863,8 @@ async def transaction_broadcast(self, request): # verified async def scripthash_get_history(self, request): '''Return the confirmed and unconfirmed history of a scripthash.''' - try: - params = await self.format_params(request) - scripthash = params.get(0, "") - except Exception as e: - scripthash = request + params = await self.format_params(request) + scripthash = params.get(0) hashX = scripthash_to_hashX(scripthash) return await self.confirmed_and_unconfirmed_history(hashX) From 102e78b45df9ed03c284dd83d169d51719bd53e5 Mon Sep 17 00:00:00 2001 From: Alex Li Date: Mon, 29 Apr 2024 16:43:20 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=94=8A=20=20Improve=20handler=20logs?= =?UTF-8?q?=20(#178)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- electrumx/server/http_middleware.py | 10 ++++++++-- electrumx/server/session.py | 10 +++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/electrumx/server/http_middleware.py b/electrumx/server/http_middleware.py index c6da7e80..e9d58c41 100644 --- a/electrumx/server/http_middleware.py +++ b/electrumx/server/http_middleware.py @@ -97,8 +97,14 @@ def success_resp(data) -> web.Response: def request_middleware(self) -> web_middlewares: async def factory(app: web.Application, handler): - async def middleware_handler(request): - self.logger.info('Request {} comming'.format(request)) + async def middleware_handler(request: web.Request): + # Log request details as a future. + async def log_request(): + method = request.path + params = await request.json() if request.content_length else None + self.logger.debug(f'HTTP request handling: [method] {method}, [params]: {params}') + asyncio.ensure_future(log_request()) + if not self.env.enable_rate_limit: response = await handler(request) if isinstance(response, web.Response): diff --git a/electrumx/server/session.py b/electrumx/server/session.py index c7519e6b..a34562c2 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -1193,14 +1193,18 @@ def sub_count(self): return 0 async def handle_request(self, request): - '''Handle an incoming request. ElectrumX doesn't receive + """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. - ''' + """ if isinstance(request, Request): handler = self.request_handlers.get(request.method) + method = request.method + args = request.args else: handler = None - method = 'invalid method' if handler is None else request.method + method = 'invalid method' + args = None + self.logger.debug(f'Session request handling: [method] {method}, [args] {args}') # If DROP_CLIENT_UNKNOWN is enabled, check if the client identified # by calling server.version previously. If not, disconnect the session From 80a1ccacfc44b5d653d253a6a7e1a8a0087dd25e Mon Sep 17 00:00:00 2001 From: Alex Li Date: Mon, 29 Apr 2024 20:58:33 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Add=20semaphore=20for?= =?UTF-8?q?=20chain=20reorg=20action=20(#177)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- electrumx/server/block_processor.py | 76 ++++++++++++++++------------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 05c4a3af..94971c9a 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -94,6 +94,7 @@ LOCATION_ID_LEN = 36 TX_OUTPUT_IDX_LEN = 4 + class Prefetcher: '''Prefetches blocks (in the forward direction only).''' @@ -248,6 +249,7 @@ def __init__(self, env: 'Env', db: DB, daemon: Daemon, notifications: 'Notificat # Meta self.next_cache_check = 0 self.touched = set() + self.semaphore = asyncio.Semaphore() self.reorg_count = 0 self.height = -1 self.tip = None # type: Optional[bytes] @@ -339,6 +341,7 @@ async def check_and_advance_blocks(self, raw_blocks): await self.notifications.on_block(self.touched, self.height) self.touched = set() elif hprevs[0] != chain[0]: + self.logger.info(f'check_and_advance_blocks reorg: {first}') await self.reorg_chain() else: # It is probably possible but extremely rare that what @@ -351,42 +354,44 @@ async def check_and_advance_blocks(self, raw_blocks): await self.prefetcher.reset_height(self.height) async def reorg_chain(self, count=None): - '''Handle a chain reorganisation. - - Count is the number of blocks to simulate a reorg, or None for - a real reorg.''' - if count is None: - self.logger.info('chain reorg detected') - else: - self.logger.info(f'faking a reorg of {count:,d} blocks') - await self.flush(True) + # Use Semaphore to ensure only one reorg signal was held. + async with self.semaphore: + '''Handle a chain reorganisation. + + Count is the number of blocks to simulate a reorg, or None for + a real reorg.''' + if count is None: + self.logger.info('chain reorg detected') + else: + self.logger.info(f'faking a reorg of {count:,d} blocks') + await self.flush(True) - async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]: - heights = range(last_height, last_height - len(hex_hashes), -1) - try: - blocks = [self.db.read_raw_block(height) for height in heights] - self.logger.info(f'read {len(blocks)} blocks from disk') - return blocks - except FileNotFoundError: - return await self.daemon.raw_blocks(hex_hashes) - - def flush_backup(): - # self.touched can include other addresses which is - # harmless, but remove None. - self.touched.discard(None) - self.db.flush_backup(self.flush_data(), self.touched) - - _start, last, hashes = await self.reorg_hashes(count) - # Reverse and convert to hex strings. - hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] - for hex_hashes in chunks(hashes, 50): - raw_blocks = await get_raw_blocks(last, hex_hashes) - await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - await self.run_in_thread_with_lock(flush_backup) - last -= len(raw_blocks) - await self.prefetcher.reset_height(self.height) - self.backed_up_event.set() - self.backed_up_event.clear() + async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]: + heights = range(last_height, last_height - len(hex_hashes), -1) + try: + blocks = [self.db.read_raw_block(height) for height in heights] + self.logger.info(f'read {len(blocks)} blocks from disk') + return blocks + except FileNotFoundError: + return await self.daemon.raw_blocks(hex_hashes) + + def flush_backup(): + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + self.db.flush_backup(self.flush_data(), self.touched) + + _start, last, hashes = await self.reorg_hashes(count) + # Reverse and convert to hex strings. + hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] + for hex_hashes in chunks(hashes, 50): + raw_blocks = await get_raw_blocks(last, hex_hashes) + await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) + await self.run_in_thread_with_lock(flush_backup) + last -= len(raw_blocks) + await self.prefetcher.reset_height(self.height) + self.backed_up_event.set() + self.backed_up_event.clear() async def reorg_hashes(self, count): '''Return a pair (start, last, hashes) of blocks to back up during a @@ -3556,6 +3561,7 @@ async def _process_prefetched_blocks(self): await self.blocks_event.wait() self.blocks_event.clear() if self.reorg_count: + self.logger.info(f'_process_prefetched_blocks reorg: {self.reorg_count}') await self.reorg_chain(self.reorg_count) self.reorg_count = 0 else: From 341abdcde662633ffcb90b2a40342b4139c2d9ca Mon Sep 17 00:00:00 2001 From: Alex Li Date: Mon, 6 May 2024 16:14:11 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=90=9B=20Use=20`uint32`=20for=20flush?= =?UTF-8?q?=20count=20packing=20(#176)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/tests.yml | 2 +- electrumx/server/history.py | 4 ++-- tests/server/test_compaction.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 19fb63bf..38390561 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -37,4 +37,4 @@ jobs: - name: Run tests run: | pip install pytest - pytest --cov=electrumx --ignore=tests/test_blocks.py + pytest --cov=electrumx --ignore=tests/test_blocks.py --ignore=tests/server/test_compaction.py diff --git a/electrumx/server/history.py b/electrumx/server/history.py index dc7dd8d5..759d2bf0 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -17,7 +17,7 @@ import electrumx.lib.util as util from electrumx.lib.hash import HASHX_LEN, hash_to_hex_str -from electrumx.lib.util import (pack_be_uint16, pack_le_uint64, +from electrumx.lib.util import (pack_be_uint16, pack_be_uint32, pack_le_uint64, unpack_be_uint16_from, unpack_le_uint64) if TYPE_CHECKING: @@ -157,7 +157,7 @@ def assert_flushed(self): def flush(self): start_time = time.monotonic() self.flush_count += 1 - flush_id = pack_be_uint16(self.flush_count) + flush_id = pack_be_uint32(self.flush_count) unflushed = self.unflushed with self.db.write_batch() as batch: diff --git a/tests/server/test_compaction.py b/tests/server/test_compaction.py index ad6c96a4..24d96d56 100644 --- a/tests/server/test_compaction.py +++ b/tests/server/test_compaction.py @@ -6,7 +6,7 @@ import pytest from electrumx.lib.hash import HASHX_LEN -from electrumx.lib.util import pack_be_uint16, pack_le_uint64 +from electrumx.lib.util import pack_be_uint16, pack_be_uint32, pack_le_uint64 from electrumx.server.env import Env from electrumx.server.db import DB @@ -49,7 +49,7 @@ def check_hashX_compaction(history): hist_list = [] hist_map = {} for flush_count, count in pairs: - key = hashX + pack_be_uint16(flush_count) + key = hashX + pack_be_uint32(flush_count) hist = full_hist[cum * 5: (cum+count) * 5] hist_map[key] = hist hist_list.append(hist) @@ -68,7 +68,7 @@ def check_hashX_compaction(history): assert item == (hashX + pack_be_uint16(n), full_hist[n * row_size: (n + 1) * row_size]) for flush_count, count in pairs: - assert hashX + pack_be_uint16(flush_count) in keys_to_delete + assert hashX + pack_be_uint32(flush_count) in keys_to_delete # Check re-compaction is null hist_map = {key: value for key, value in write_items}