From 4e9527cef03738b7a6223194d25b23146c82e9f4 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Mon, 10 May 2021 13:00:14 +0000 Subject: [PATCH 1/2] fix dataloader exit hang when join re-enter. test=develop --- .../paddle/fluid/dataloader/dataloader_iter.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 52ab83698592a..d86773aeced36 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -289,10 +289,14 @@ def __init__(self, loader): # if user exit python program when dataloader is still # iterating, resource may no release safely, so we - # add __del__ function to to CleanupFuncRegistrar - # to make sure __del__ is always called when program + # add _shutdown_on_exit function to to CleanupFuncRegistrar + # to make sure _try_shutdown_all is always called when program # exit for resoure releasing safely - CleanupFuncRegistrar.register(self.__del__) + # _try_shutdown_all may re-enter for we register _shutdown_on_exit + # to atexit, so we use _shutdown_on_exit (not __del__) for + # _shutdown_on_exit add timeout=1 for process.join, which + # may hang when re-enter in some OS + CleanupFuncRegistrar.register(self._shutdown_on_exit) def _init_workers(self): # multiprocess worker and indice queue list initial as empty @@ -363,7 +367,7 @@ def _shutdown_worker(self, worker_id): self._indices_queues[worker_id].put(None) self._worker_status[worker_id] = False - def _try_shutdown_all(self): + def _try_shutdown_all(self, timeout=None): if not self._shutdown: try: self._exit_thread_expectedly() @@ -377,7 +381,7 @@ def _try_shutdown_all(self): self._shutdown_worker(i) for w in self._workers: - w.join() + w.join(timeout) for q in self._indices_queues: q.cancel_join_thread() q.close() @@ -560,6 +564,9 @@ def _try_put_indices(self): def __del__(self): self._try_shutdown_all() + def _shutdown_on_exit(self): + self._try_shutdown_all(1) + def __next__(self): try: # _batches_outstanding here record the total batch data number From a2e5cda4b7f9124f366b0b0a91169ae42232bcc0 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Tue, 11 May 2021 06:41:42 +0000 Subject: [PATCH 2/2] double check _shutdown. test=develop --- .../fluid/dataloader/dataloader_iter.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index d86773aeced36..1f928bfc8a689 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -292,10 +292,10 @@ def __init__(self, loader): # add _shutdown_on_exit function to to CleanupFuncRegistrar # to make sure _try_shutdown_all is always called when program # exit for resoure releasing safely - # _try_shutdown_all may re-enter for we register _shutdown_on_exit - # to atexit, so we use _shutdown_on_exit (not __del__) for - # _shutdown_on_exit add timeout=1 for process.join, which - # may hang when re-enter in some OS + # worker join may hang for in _try_shutdown_all call in atexit + # for main process is in atexit state in some OS, so we add + # timeout=1 for shutdown function call in atexit, for shutdown + # function call in __del__, we keep it as it is CleanupFuncRegistrar.register(self._shutdown_on_exit) def _init_workers(self): @@ -380,11 +380,12 @@ def _try_shutdown_all(self, timeout=None): for i in range(self._num_workers): self._shutdown_worker(i) - for w in self._workers: - w.join(timeout) - for q in self._indices_queues: - q.cancel_join_thread() - q.close() + if not self._shutdown: + for w in self._workers: + w.join(timeout) + for q in self._indices_queues: + q.cancel_join_thread() + q.close() finally: core._erase_process_pids(id(self)) self._shutdown = True