Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

IS-915: Fix infinite loop on ipc_server termination #387

Merged
merged 1 commit into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions iconservice/icon_inner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from asyncio import get_event_loop
import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Any, TYPE_CHECKING, Optional, Tuple

from earlgrey import message_queue_task, MessageQueueStub, MessageQueueService

from iconcommons.logger import Logger

from iconservice.base.address import Address
from iconservice.base.block import Block
from iconservice.base.exception import ExceptionCode, IconServiceBaseException, InvalidBaseTransactionException, \
FatalException, ServiceNotReadyException
from iconservice.base.type_converter import TypeConverter, ParamType
from iconservice.base.type_converter_templates import ConstantKeys
from iconservice.icon_constant import ICON_INNER_LOG_TAG, ICON_SERVICE_LOG_TAG, \
EnableThreadFlag, ENABLE_THREAD_FLAG, ConfigKey, RCStatus
EnableThreadFlag, ENABLE_THREAD_FLAG
from iconservice.icon_service_engine import IconServiceEngine
from iconservice.utils import check_error_response, to_camel_case

Expand All @@ -39,6 +39,9 @@
THREAD_VALIDATE = 'validate'


_TAG = "IIS"


class IconScoreInnerTask(object):
def __init__(self, conf: 'IconConfig'):
self._conf = conf
Expand Down Expand Up @@ -75,7 +78,7 @@ async def hello(self):
await ready_future

if self._is_thread_flag_on(EnableThreadFlag.INVOKE):
loop = get_event_loop()
loop = asyncio.get_event_loop()
ret = await loop.run_in_executor(self._thread_pool[THREAD_INVOKE], self._hello)
else:
ret = self._hello()
Expand All @@ -88,25 +91,31 @@ def _hello(self):
return self._icon_service_engine.hello()

def _close(self):
Logger.info("icon_score_service close", ICON_INNER_LOG_TAG)
Logger.info(tag=_TAG, msg="_close() start")

if self._icon_service_engine:
self._icon_service_engine.close()
self._icon_service_engine = None
MessageQueueService.loop.stop()

Logger.info(tag=_TAG, msg="_close() end")

@message_queue_task
async def close(self):
Logger.info(tag=_TAG, msg="close() start")

self._close()

Logger.info(tag=_TAG, msg="close() end")

@message_queue_task
async def invoke(self, request: dict):
Logger.info(f'invoke request with {request}', ICON_INNER_LOG_TAG)

self._check_icon_service_ready()

if self._is_thread_flag_on(EnableThreadFlag.INVOKE):
loop = get_event_loop()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._thread_pool[THREAD_INVOKE],
self._invoke, request)
else:
Expand Down Expand Up @@ -184,7 +193,7 @@ async def query(self, request: dict):
self._check_icon_service_ready()

if self._is_thread_flag_on(EnableThreadFlag.QUERY):
loop = get_event_loop()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._thread_pool[THREAD_QUERY],
self._query, request)
else:
Expand Down Expand Up @@ -227,7 +236,7 @@ async def call(self, request: dict):
self._check_icon_service_ready()

if self._is_thread_flag_on(EnableThreadFlag.QUERY):
loop = get_event_loop()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._thread_pool[THREAD_QUERY],
self._call, request)
else:
Expand Down Expand Up @@ -261,7 +270,7 @@ async def write_precommit_state(self, request: dict):
self._check_icon_service_ready()

if self._is_thread_flag_on(EnableThreadFlag.INVOKE):
loop = get_event_loop()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._thread_pool[THREAD_INVOKE],
self._write_precommit_state, request)
else:
Expand Down Expand Up @@ -309,7 +318,7 @@ async def remove_precommit_state(self, request: dict):
self._check_icon_service_ready()

if self._is_thread_flag_on(EnableThreadFlag.INVOKE):
loop = get_event_loop()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._thread_pool[THREAD_INVOKE],
self._remove_precommit_state, request)
else:
Expand Down Expand Up @@ -345,7 +354,7 @@ async def validate_transaction(self, request: dict):
self._check_icon_service_ready()

if self._is_thread_flag_on(EnableThreadFlag.VALIDATE):
loop = get_event_loop()
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._thread_pool[THREAD_VALIDATE],
self._validate_transaction, request)
else:
Expand Down
22 changes: 20 additions & 2 deletions iconservice/iiss/reward_calc/ipc/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,17 @@
_next_msg_id: int = 1


def _get_next_id() -> int:
def reset_next_msg_id(msg_id: int):
"""Only used for unittest

:param msg_id:
:return:
"""
global _next_msg_id
_next_msg_id = msg_id


def _get_next_msg_id() -> int:
global _next_msg_id

msg_id: int = _next_msg_id
Expand All @@ -51,7 +61,7 @@ class MessageType(IntEnum):
class Request(metaclass=ABCMeta):
def __init__(self, msg_type: 'MessageType'):
self.msg_type = msg_type
self.msg_id = _get_next_id()
self.msg_id = _get_next_msg_id()

@abstractmethod
def _to_list(self) -> tuple:
Expand Down Expand Up @@ -453,9 +463,14 @@ def from_list(items: list) -> 'CalculateDoneNotification':


class NoneRequest(Request):
"""This request is used to stop ipc channel on iconservice stopping
"""
def __init__(self):
super().__init__(MessageType.NONE)

def __str__(self):
return f"NONE_REQUEST({self.msg_id})"

def _to_list(self) -> tuple:
return self.msg_type, self.msg_id

Expand All @@ -467,6 +482,9 @@ def __init__(self, msg_id: int):
super().__init__()
self.msg_id = msg_id

def __str__(self):
return f"NONE_RESPONSE({self.msg_id})"

@staticmethod
def from_list(items: list) -> 'NoneResponse':
msg_id: int = items[1]
Expand Down
24 changes: 20 additions & 4 deletions iconservice/iiss/reward_calc/ipc/reward_calc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@
import concurrent.futures
import os
from subprocess import Popen
from typing import Optional, Callable, Any, Tuple
from typing import TYPE_CHECKING, Optional, Callable, Any, Tuple

from iconcommons.logger import Logger
from iconservice.icon_constant import RCStatus

from .message import *
from .message_queue import MessageQueue
from .server import IPCServer
from ....base.address import Address
from ....base.exception import TimeoutException
from ....icon_constant import RCStatus
from ....utils import bytes_to_hex

if TYPE_CHECKING:
from .message import ReadyNotification, CalculateDoneNotification, NoneResponse

_TAG = "RCP"


Expand Down Expand Up @@ -77,26 +81,38 @@ def open(self, log_dir: str, sock_path: str, iiss_db_path: str):

def start(self):
Logger.debug(tag=_TAG, msg="start() end")

self._ipc_server.start()

Logger.debug(tag=_TAG, msg="start() end")

def stop(self):
Logger.debug(tag=_TAG, msg="stop() start")

self._stop_message_queue()
self._ipc_server.stop()

Logger.debug(tag=_TAG, msg="stop() end")

def close(self):
Logger.debug(tag=_TAG, msg="close() start")

self._ipc_server.close()
self.stop_reward_calc()

self._message_queue = None
self._loop = None

self.stop_reward_calc()

Logger.debug(tag=_TAG, msg="close() end")

def _stop_message_queue(self):
Logger.info(tag=_TAG, msg="_stop_ipc_server() start")

request = NoneRequest()
future: asyncio.Future = self._message_queue.put(request)

Logger.info(tag=_TAG, msg="_stop_ipc_server() start")

def is_reward_calculator_ready(self) -> bool:
return self._ready_future.done()

Expand Down
78 changes: 43 additions & 35 deletions iconservice/iiss/reward_calc/ipc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,78 +19,86 @@

from iconcommons import Logger

from .message import MessageType, Request, NoneRequest, NoneResponse
from .message import MessageType, Request
from .message_queue import MessageQueue
from .message_unpacker import MessageUnpacker


_TAG = "RCP"


class IPCServer(object):
def __init__(self):
self._running = False
self._loop = None
self._server = None
self._path = None
self._queue: Optional['MessageQueue'] = None
self._unpacker: Optional['MessageUnpacker'] = MessageUnpacker()
self._tasks = []

def open(self, loop, message_queue: 'MessageQueue', path: str):
Logger.info(tag=_TAG, msg="open() start")

assert loop
assert message_queue
assert isinstance(path, str)

self._loop = loop
self._queue = message_queue
self._path = path

server = asyncio.start_unix_server(self._on_accepted, path)

self._server = server
Logger.info(tag=_TAG, msg="open() end")

def start(self):
if self._server is None:
Logger.info(tag=_TAG, msg="start() start")

if self._running:
return

self._server = self._loop.run_until_complete(self._server)
self._running = True
co = asyncio.start_unix_server(self._on_accepted, self._path)
asyncio.ensure_future(co)

Logger.info(tag=_TAG, msg="start() end")

def stop(self):
for t in self._tasks:
t.cancel()
Logger.info(tag=_TAG, msg="stop() start")

if self._server is None:
if not self._running:
return

self._server.close()
self._running = False

for t in self._tasks:
t.cancel()

Logger.info(tag=_TAG, msg="stop() end")

def close(self):
if self._server is not None:
asyncio.wait_for(self._server.wait_closed(), 5)
self._server = None
Logger.info(tag=_TAG, msg="close() start")

self._loop = None
self._queue = None
self._unpacker = None

Logger.info(tag=_TAG, msg="close() end")

def _on_accepted(self, reader: 'StreamReader', writer: 'StreamWriter'):
Logger.debug(tag=_TAG, msg=f"on_accepted() start: {reader} {writer}")
Logger.info(tag=_TAG, msg=f"on_accepted() start: {reader} {writer}")

self._tasks.append(asyncio.ensure_future(self._on_send(writer)))
self._tasks.append(asyncio.ensure_future(self._on_recv(reader)))

Logger.debug(tag=_TAG, msg="on_accepted() end")
Logger.info(tag=_TAG, msg="on_accepted() end")

async def _on_send(self, writer: 'StreamWriter'):
Logger.debug(tag=_TAG, msg="_on_send() start")
Logger.info(tag=_TAG, msg="_on_send() start")

while True:
while self._running:
try:
request: 'Request' = await self._queue.get()
if request.msg_type == MessageType.NONE:
self._queue.put_response(
NoneResponse.from_list([request.msg_type, request.msg_id])
)
self._queue.task_done()

self._queue.task_done()
if request.msg_type == MessageType.NONE:
# Stopping IPCServer
break

data: bytes = request.to_bytes()
Expand All @@ -99,19 +107,19 @@ async def _on_send(self, writer: 'StreamWriter'):
writer.write(data)
await writer.drain()

self._queue.task_done()

except asyncio.CancelledError:
pass
except BaseException as e:
Logger.error(tag=_TAG, msg=str(e))
Logger.warning(tag=_TAG, msg=str(e))

writer.close()

Logger.debug(tag=_TAG, msg="_on_send() end")
Logger.info(tag=_TAG, msg="_on_send() end")

async def _on_recv(self, reader: 'StreamReader'):
Logger.debug(tag=_TAG, msg="_on_recv() start")
Logger.info(tag=_TAG, msg="_on_recv() start")

while True:
while self._running:
try:
data: bytes = await reader.read(1024)
if not isinstance(data, bytes) or len(data) == 0:
Expand All @@ -125,9 +133,9 @@ async def _on_recv(self, reader: 'StreamReader'):
Logger.info(tag=_TAG, msg=f"Received Data : {response}")
self._queue.message_handler(response)

except asyncio.CancelledError:
pass
except BaseException as e:
Logger.error(tag=_TAG, msg=str(e))

await self._queue.put(NoneRequest())
Logger.warning(tag=_TAG, msg=str(e))

Logger.debug(tag=_TAG, msg="_on_recv() end")
Logger.info(tag=_TAG, msg="_on_recv() end")
Loading