From ec47abc6ead95403dd23620cb834aaac5aa27ab7 Mon Sep 17 00:00:00 2001 From: Chiwon Cho Date: Tue, 12 Nov 2019 19:14:20 +0900 Subject: [PATCH] IS-915: Fix infinite loop on ipc_server termination * Put NoneRequest to MessageQueue to stop on_send() and on_recv() * Fix some asyncio errors * Clean up unused codes from IPCServer * Fix a minor unittest error * Update asyncio statements in IconInnerService --- iconservice/icon_inner_service.py | 31 +++++--- iconservice/iiss/reward_calc/ipc/message.py | 22 +++++- .../iiss/reward_calc/ipc/reward_calc_proxy.py | 24 +++++- iconservice/iiss/reward_calc/ipc/server.py | 78 ++++++++++--------- tests/iiss/ipc/test_message.py | 10 ++- 5 files changed, 109 insertions(+), 56 deletions(-) diff --git a/iconservice/icon_inner_service.py b/iconservice/icon_inner_service.py index 304cb9dd2..25504759a 100644 --- a/iconservice/icon_inner_service.py +++ b/iconservice/icon_inner_service.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from asyncio import get_event_loop +import asyncio from concurrent.futures.thread import ThreadPoolExecutor from typing import Any, TYPE_CHECKING, Optional, Tuple from earlgrey import message_queue_task, MessageQueueStub, MessageQueueService - from iconcommons.logger import Logger + from iconservice.base.address import Address from iconservice.base.block import Block from iconservice.base.exception import ExceptionCode, IconServiceBaseException, InvalidBaseTransactionException, \ @@ -26,7 +26,7 @@ from iconservice.base.type_converter import TypeConverter, ParamType from iconservice.base.type_converter_templates import ConstantKeys from iconservice.icon_constant import ICON_INNER_LOG_TAG, ICON_SERVICE_LOG_TAG, \ - EnableThreadFlag, ENABLE_THREAD_FLAG, ConfigKey, RCStatus + EnableThreadFlag, ENABLE_THREAD_FLAG from iconservice.icon_service_engine import IconServiceEngine from iconservice.utils import check_error_response, to_camel_case @@ -39,6 +39,9 @@ THREAD_VALIDATE = 'validate' +_TAG = "IIS" + + class IconScoreInnerTask(object): def __init__(self, conf: 'IconConfig'): self._conf = conf @@ -75,7 +78,7 @@ async def hello(self): await ready_future if self._is_thread_flag_on(EnableThreadFlag.INVOKE): - loop = get_event_loop() + loop = asyncio.get_event_loop() ret = await loop.run_in_executor(self._thread_pool[THREAD_INVOKE], self._hello) else: ret = self._hello() @@ -88,17 +91,23 @@ def _hello(self): return self._icon_service_engine.hello() def _close(self): - Logger.info("icon_score_service close", ICON_INNER_LOG_TAG) + Logger.info(tag=_TAG, msg="_close() start") if self._icon_service_engine: self._icon_service_engine.close() self._icon_service_engine = None MessageQueueService.loop.stop() + Logger.info(tag=_TAG, msg="_close() end") + @message_queue_task async def close(self): + Logger.info(tag=_TAG, msg="close() start") + self._close() + Logger.info(tag=_TAG, msg="close() end") + @message_queue_task async def invoke(self, request: dict): Logger.info(f'invoke request with {request}', ICON_INNER_LOG_TAG) @@ -106,7 +115,7 @@ async def invoke(self, request: dict): self._check_icon_service_ready() if self._is_thread_flag_on(EnableThreadFlag.INVOKE): - loop = get_event_loop() + loop = asyncio.get_event_loop() return await loop.run_in_executor(self._thread_pool[THREAD_INVOKE], self._invoke, request) else: @@ -184,7 +193,7 @@ async def query(self, request: dict): self._check_icon_service_ready() if self._is_thread_flag_on(EnableThreadFlag.QUERY): - loop = get_event_loop() + loop = asyncio.get_event_loop() return await loop.run_in_executor(self._thread_pool[THREAD_QUERY], self._query, request) else: @@ -227,7 +236,7 @@ async def call(self, request: dict): self._check_icon_service_ready() if self._is_thread_flag_on(EnableThreadFlag.QUERY): - loop = get_event_loop() + loop = asyncio.get_event_loop() return await loop.run_in_executor(self._thread_pool[THREAD_QUERY], self._call, request) else: @@ -261,7 +270,7 @@ async def write_precommit_state(self, request: dict): self._check_icon_service_ready() if self._is_thread_flag_on(EnableThreadFlag.INVOKE): - loop = get_event_loop() + loop = asyncio.get_event_loop() return await loop.run_in_executor(self._thread_pool[THREAD_INVOKE], self._write_precommit_state, request) else: @@ -309,7 +318,7 @@ async def remove_precommit_state(self, request: dict): self._check_icon_service_ready() if self._is_thread_flag_on(EnableThreadFlag.INVOKE): - loop = get_event_loop() + loop = asyncio.get_event_loop() return await loop.run_in_executor(self._thread_pool[THREAD_INVOKE], self._remove_precommit_state, request) else: @@ -345,7 +354,7 @@ async def validate_transaction(self, request: dict): self._check_icon_service_ready() if self._is_thread_flag_on(EnableThreadFlag.VALIDATE): - loop = get_event_loop() + loop = asyncio.get_event_loop() return await loop.run_in_executor(self._thread_pool[THREAD_VALIDATE], self._validate_transaction, request) else: diff --git a/iconservice/iiss/reward_calc/ipc/message.py b/iconservice/iiss/reward_calc/ipc/message.py index 4aad8c871..91a2e4fdb 100644 --- a/iconservice/iiss/reward_calc/ipc/message.py +++ b/iconservice/iiss/reward_calc/ipc/message.py @@ -25,7 +25,17 @@ _next_msg_id: int = 1 -def _get_next_id() -> int: +def reset_next_msg_id(msg_id: int): + """Only used for unittest + + :param msg_id: + :return: + """ + global _next_msg_id + _next_msg_id = msg_id + + +def _get_next_msg_id() -> int: global _next_msg_id msg_id: int = _next_msg_id @@ -51,7 +61,7 @@ class MessageType(IntEnum): class Request(metaclass=ABCMeta): def __init__(self, msg_type: 'MessageType'): self.msg_type = msg_type - self.msg_id = _get_next_id() + self.msg_id = _get_next_msg_id() @abstractmethod def _to_list(self) -> tuple: @@ -453,9 +463,14 @@ def from_list(items: list) -> 'CalculateDoneNotification': class NoneRequest(Request): + """This request is used to stop ipc channel on iconservice stopping + """ def __init__(self): super().__init__(MessageType.NONE) + def __str__(self): + return f"NONE_REQUEST({self.msg_id})" + def _to_list(self) -> tuple: return self.msg_type, self.msg_id @@ -467,6 +482,9 @@ def __init__(self, msg_id: int): super().__init__() self.msg_id = msg_id + def __str__(self): + return f"NONE_RESPONSE({self.msg_id})" + @staticmethod def from_list(items: list) -> 'NoneResponse': msg_id: int = items[1] diff --git a/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py b/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py index 0be0f923a..1366f9b3f 100644 --- a/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py +++ b/iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py @@ -19,17 +19,21 @@ import concurrent.futures import os from subprocess import Popen -from typing import Optional, Callable, Any, Tuple +from typing import TYPE_CHECKING, Optional, Callable, Any, Tuple from iconcommons.logger import Logger -from iconservice.icon_constant import RCStatus + from .message import * from .message_queue import MessageQueue from .server import IPCServer from ....base.address import Address from ....base.exception import TimeoutException +from ....icon_constant import RCStatus from ....utils import bytes_to_hex +if TYPE_CHECKING: + from .message import ReadyNotification, CalculateDoneNotification, NoneResponse + _TAG = "RCP" @@ -77,26 +81,38 @@ def open(self, log_dir: str, sock_path: str, iiss_db_path: str): def start(self): Logger.debug(tag=_TAG, msg="start() end") + self._ipc_server.start() + Logger.debug(tag=_TAG, msg="start() end") def stop(self): Logger.debug(tag=_TAG, msg="stop() start") + + self._stop_message_queue() self._ipc_server.stop() + Logger.debug(tag=_TAG, msg="stop() end") def close(self): Logger.debug(tag=_TAG, msg="close() start") self._ipc_server.close() + self.stop_reward_calc() self._message_queue = None self._loop = None - self.stop_reward_calc() - Logger.debug(tag=_TAG, msg="close() end") + def _stop_message_queue(self): + Logger.info(tag=_TAG, msg="_stop_ipc_server() start") + + request = NoneRequest() + future: asyncio.Future = self._message_queue.put(request) + + Logger.info(tag=_TAG, msg="_stop_ipc_server() start") + def is_reward_calculator_ready(self) -> bool: return self._ready_future.done() diff --git a/iconservice/iiss/reward_calc/ipc/server.py b/iconservice/iiss/reward_calc/ipc/server.py index 8a943cbb6..26aca3d0c 100644 --- a/iconservice/iiss/reward_calc/ipc/server.py +++ b/iconservice/iiss/reward_calc/ipc/server.py @@ -19,78 +19,86 @@ from iconcommons import Logger -from .message import MessageType, Request, NoneRequest, NoneResponse +from .message import MessageType, Request from .message_queue import MessageQueue from .message_unpacker import MessageUnpacker - _TAG = "RCP" class IPCServer(object): def __init__(self): + self._running = False self._loop = None - self._server = None + self._path = None self._queue: Optional['MessageQueue'] = None self._unpacker: Optional['MessageUnpacker'] = MessageUnpacker() self._tasks = [] def open(self, loop, message_queue: 'MessageQueue', path: str): + Logger.info(tag=_TAG, msg="open() start") + assert loop assert message_queue assert isinstance(path, str) self._loop = loop self._queue = message_queue + self._path = path - server = asyncio.start_unix_server(self._on_accepted, path) - - self._server = server + Logger.info(tag=_TAG, msg="open() end") def start(self): - if self._server is None: + Logger.info(tag=_TAG, msg="start() start") + + if self._running: return - self._server = self._loop.run_until_complete(self._server) + self._running = True + co = asyncio.start_unix_server(self._on_accepted, self._path) + asyncio.ensure_future(co) + + Logger.info(tag=_TAG, msg="start() end") def stop(self): - for t in self._tasks: - t.cancel() + Logger.info(tag=_TAG, msg="stop() start") - if self._server is None: + if not self._running: return - self._server.close() + self._running = False + + for t in self._tasks: + t.cancel() + + Logger.info(tag=_TAG, msg="stop() end") def close(self): - if self._server is not None: - asyncio.wait_for(self._server.wait_closed(), 5) - self._server = None + Logger.info(tag=_TAG, msg="close() start") self._loop = None - self._queue = None self._unpacker = None + Logger.info(tag=_TAG, msg="close() end") + def _on_accepted(self, reader: 'StreamReader', writer: 'StreamWriter'): - Logger.debug(tag=_TAG, msg=f"on_accepted() start: {reader} {writer}") + Logger.info(tag=_TAG, msg=f"on_accepted() start: {reader} {writer}") self._tasks.append(asyncio.ensure_future(self._on_send(writer))) self._tasks.append(asyncio.ensure_future(self._on_recv(reader))) - Logger.debug(tag=_TAG, msg="on_accepted() end") + Logger.info(tag=_TAG, msg="on_accepted() end") async def _on_send(self, writer: 'StreamWriter'): - Logger.debug(tag=_TAG, msg="_on_send() start") + Logger.info(tag=_TAG, msg="_on_send() start") - while True: + while self._running: try: request: 'Request' = await self._queue.get() - if request.msg_type == MessageType.NONE: - self._queue.put_response( - NoneResponse.from_list([request.msg_type, request.msg_id]) - ) + self._queue.task_done() - self._queue.task_done() + if request.msg_type == MessageType.NONE: + # Stopping IPCServer break data: bytes = request.to_bytes() @@ -99,19 +107,19 @@ async def _on_send(self, writer: 'StreamWriter'): writer.write(data) await writer.drain() - self._queue.task_done() - + except asyncio.CancelledError: + pass except BaseException as e: - Logger.error(tag=_TAG, msg=str(e)) + Logger.warning(tag=_TAG, msg=str(e)) writer.close() - Logger.debug(tag=_TAG, msg="_on_send() end") + Logger.info(tag=_TAG, msg="_on_send() end") async def _on_recv(self, reader: 'StreamReader'): - Logger.debug(tag=_TAG, msg="_on_recv() start") + Logger.info(tag=_TAG, msg="_on_recv() start") - while True: + while self._running: try: data: bytes = await reader.read(1024) if not isinstance(data, bytes) or len(data) == 0: @@ -125,9 +133,9 @@ async def _on_recv(self, reader: 'StreamReader'): Logger.info(tag=_TAG, msg=f"Received Data : {response}") self._queue.message_handler(response) + except asyncio.CancelledError: + pass except BaseException as e: - Logger.error(tag=_TAG, msg=str(e)) - - await self._queue.put(NoneRequest()) + Logger.warning(tag=_TAG, msg=str(e)) - Logger.debug(tag=_TAG, msg="_on_recv() end") + Logger.info(tag=_TAG, msg="_on_recv() end") diff --git a/tests/iiss/ipc/test_message.py b/tests/iiss/ipc/test_message.py index a584d0c11..6dd20db2a 100644 --- a/tests/iiss/ipc/test_message.py +++ b/tests/iiss/ipc/test_message.py @@ -5,21 +5,23 @@ class TestMessage(unittest.TestCase): def test_get_next_id(self): + message.reset_next_msg_id(1) + assert message._next_msg_id == 1 - msg_id: int = message._get_next_id() + msg_id: int = message._get_next_msg_id() assert msg_id == 1 assert message._next_msg_id == 2 - msg_id: int = message._get_next_id() + msg_id: int = message._get_next_msg_id() assert msg_id == 2 assert message._next_msg_id == 3 message._next_msg_id = 0xffffffff - msg_id: int = message._get_next_id() + msg_id: int = message._get_next_msg_id() assert msg_id == 0xffffffff assert message._next_msg_id == 1 - msg_id: int = message._get_next_id() + msg_id: int = message._get_next_msg_id() assert msg_id == 1 assert message._next_msg_id == 2