Skip to content

Commit

Permalink
fix dataloader exit terminate error (#34501)
Browse files Browse the repository at this point in the history
* fix DataLoader exit with SIGABRT/SIGSEGV. test=develop
  • Loading branch information
heavengate authored Sep 16, 2021
1 parent 2df74aa commit e93c18a
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 62 deletions.
155 changes: 109 additions & 46 deletions python/paddle/fluid/dataloader/dataloader_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,36 @@

__all__ = ['get_worker_info']

# NOTE: fix `terminate called without an active exception`
# if for loop break and program exit immediately(with no model
# layers processing) after iterate **the first few data** in
# distributed lauch mode, distributed launch will call
# terminate() to kill main process on each devices, but thread
# is still iterating to fullfill blocking queue caches, which
# may cause thread error `terminate called without an active
# exception` for terminate is a strong singal and `__del__`
# of DataLoader may not be called, so we add a global link to
# the last DataLoader instance to call `__del__` to clean up
# resources
# NOTE: cannot simply as `__del__` to CleanupFuncRegistrar,
# for this will remain a link to each DataLoader instance in
# global, and will precludes GC to auto collect DataLoader
# instance and will cause memory leak
_loader = None


def _clear_loader():
global _loader
if _loader is not None:
try:
_loader.__del__()
del _loader
except:
pass


CleanupFuncRegistrar.register(_clear_loader)


class _DataLoaderIterBase(object):
"""
Expand Down Expand Up @@ -100,6 +130,16 @@ def __iter__(self):
def __len__(self):
return len(self._batch_sampler)

def _exit_thread_expectedly(self):
self._thread_done_event.set()
if self._blocking_queue:
self._blocking_queue.close()

def _exit_thread_unexpectedly(self):
self._thread_done_event.set()
if self._blocking_queue:
self._blocking_queue.kill()


class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
"""
Expand All @@ -125,9 +165,13 @@ def __init__(self, loader):
# NOTE: len(self._places) batch data compose as an output
# iteration, set blocking_queue can cache 2 iteration datas
# at most here
self._blocking_queue_capacity = 2 * len(self._places)
self._blocking_queue_capacity = 1 * len(self._places)

self._init_thread()
self._shutdown = False

global _loader
_loader = self

def _init_thread(self):
self._var_names = [v.name for v in self._feed_list]
Expand All @@ -151,22 +195,35 @@ def _init_thread(self):
self._thread.start()

def _thread_loop(self, legacy_expected_place):
try:
#NOTE(zhiqiu): Set the expected place for new thread as the same as father thread,
# and it will call platform::SetDeviceId() in c++ internally.
# If we do not set cudaDeviceId in new thread, the default cudaDeviceId will be 0,
# Which may cost hundreds of MB of GPU memory on CUDAPlace(0) if calling some cuda
# APIs in this thread.
_set_expected_place(legacy_expected_place)

for indices in self._sampler_iter:
#NOTE(zhiqiu): Set the expected place for new thread as the same as father thread,
# and it will call platform::SetDeviceId() in c++ internally.
# If we do not set cudaDeviceId in new thread, the default cudaDeviceId will be 0,
# Which may cost hundreds of MB of GPU memory on CUDAPlace(0) if calling some cuda
# APIs in this thread.
_set_expected_place(legacy_expected_place)

while not self._thread_done_event.is_set():
try:
indices = next(self._sampler_iter)

# read data from dataset in mini-batch
batch = self._dataset_fetcher.fetch(indices)
# with paddle.fluid.dygraph.guard(place=paddle.CPUPlace()):
# read data from dataset in mini-batch
batch = self._dataset_fetcher.fetch(indices,
self._thread_done_event)
except StopIteration:
self._exit_thread_expectedly()
return

if batch is None or self._thread_done_event.is_set(): break

# flat batch and record structure infos
batch, structure = _flatten_batch(batch)
self._structure_infos.append(structure)

# flat batch and record structure infos
batch, structure = _flatten_batch(batch)
self._structure_infos.append(structure)
if self._thread_done_event.is_set(): break

try:
# pack as LoDTensorArray
array = core.LoDTensorArray()
for slot in batch:
Expand All @@ -179,21 +236,18 @@ def _thread_loop(self, legacy_expected_place):

array.append(slot)

if not self._blocking_queue.push(array):
break
if self._thread_done_event.is_set(): break

if self._thread_done_event.is_set():
break
try:
self._blocking_queue.push(array)
except:
self._exit_thread_expectedly()

self._blocking_queue.close()
self._shutdown_thread()
except StopIteration:
self._blocking_queue.close()
except Exception:
self._blocking_queue.kill()
self._shutdown_thread()
logging.warning("DataLoader reader thread raised an exception.")
six.reraise(*sys.exc_info())
except:
self._exit_thread_unexpectedly()
six.reraise(*sys.exc_info())

self._exit_thread_expectedly()

def __next__(self):
try:
Expand Down Expand Up @@ -221,28 +275,46 @@ def __next__(self):
return data
except StopIteration:
self._reader.shutdown()
self._try_shutdown_all()
six.reraise(*sys.exc_info())

def _shutdown_thread(self):
if self._thread:
self._thread_done_event.set()
if self._thread is not threading.current_thread():
self._thread.join()
# NOTE: we wait for _thread exit for 3 seconds, if
# thread not exit normally, force kill it
for _ in range(3):
if self._thread.is_alive():
time.sleep(1)
else:
break
else:
if self._thread is not threading.current_thread():
self._thread.join()

self._thread = None

# python2 compatibility
def next(self):
return self.__next__()

def _try_shutdown_all(self):
if not self._shutdown:
try:
# # _blocking_queue in keep order mode holds sub-threads
# # need to release thread resources on unexpected exit
if self._blocking_queue:
self._blocking_queue.close()
self._blocking_queue = None
# NOTE: blocking queue should be closed firstly for
# blocking queue read may hang and _thread_done_event
# cannot be checked
self._shutdown_thread()
finally:
self._shutdown = True

def __del__(self):
# _blocking_queue in keep order mode holds sub-threads
# need to release thread resources on unexpected exit
if self._blocking_queue:
self._blocking_queue.close()
# NOTE: blocking queue should be closed firstly for
# blocking queue read may hang and _thread_done_event
# cannot be checked
self._shutdown_thread()
self._try_shutdown_all()


class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
Expand Down Expand Up @@ -421,15 +493,6 @@ def _try_shutdown_all(self, timeout=None):
core._erase_process_pids(id(self))
self._shutdown = True

def _exit_thread_expectedly(self):
self._thread_done_event.set()
self._blocking_queue.close()

def _exit_thread_unexpectedly(self):
self._thread_done_event.set()
self._blocking_queue.kill()
logging.error("DataLoader reader thread raised an exception!")

def _thread_loop(self, legacy_expected_place):
#NOTE(zhiqiu): Set the expected place for new thread as the same as father thread,
# and it will call platform::SetDeviceId() in c++ internally.
Expand Down
33 changes: 25 additions & 8 deletions python/paddle/fluid/dataloader/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,16 @@ def __init__(self, dataset, auto_collate_batch, collate_fn, drop_last):
self.collate_fn = collate_fn
self.drop_last = drop_last

def fetch(self, batch_indices):
# NOTE: fetch function here perform the whole pipeline of dataset
# reading and data trasforms of a batch in each calling, this
# may take a long time inside, if DataLoader is exit outside,
# fetch need to perceive exit situation, so we pass done_event
# here for fetch to check exit status
# NOTE: if DataLoadet exit by `break`, performing GPU tensor operations,
# e.g. to_tensor may cause SIGSEGV in thread, so we pass the
# done_event argument to check DataLoader exit status between
# ecah sample processing in the batch
def fetch(self, batch_indices, done_event=None):
raise NotImplementedError("'fetch' not implement for class {}".format(
self.__class__.__name__))

Expand Down Expand Up @@ -69,15 +78,18 @@ def __init__(self, dataset, auto_collate_batch, collate_fn, drop_last):
dataset, auto_collate_batch, collate_fn, drop_last)
self.dataset_iter = iter(dataset)

def fetch(self, batch_indices):
def fetch(self, batch_indices, done_event=None):

if self.auto_collate_batch:
data = []
for _ in batch_indices:
try:
data.append(next(self.dataset_iter))
except StopIteration:
break
if done_event is None or not done_event.is_set():
try:
data.append(next(self.dataset_iter))
except StopIteration:
break
else:
return None

if len(data) == 0 or (self.drop_last and
len(data) < len(batch_indices)):
Expand All @@ -101,9 +113,14 @@ def __init__(self, dataset, auto_collate_batch, collate_fn, drop_last):
super(_MapDatasetFetcher, self).__init__(dataset, auto_collate_batch,
collate_fn, drop_last)

def fetch(self, batch_indices):
def fetch(self, batch_indices, done_event=None):
if self.auto_collate_batch:
data = [self.dataset[idx] for idx in batch_indices]
data = []
for idx in batch_indices:
if done_event is None or not done_event.is_set():
data.append(self.dataset[idx])
else:
return None

global _WARNING_TO_LOG
if not isinstance(data[0], (Sequence, Mapping)) \
Expand Down
15 changes: 7 additions & 8 deletions python/paddle/fluid/tests/unittests/test_dataloader_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ def test_main(self):
class TestDatasetWithDiffOutputPlace(unittest.TestCase):
def get_dataloader(self, num_workers):
dataset = paddle.vision.datasets.MNIST(
mode='test', transform=transforms.ToTensor())
mode='test',
transform=transforms.Compose([
transforms.CenterCrop(20), transforms.RandomResizedCrop(14),
transforms.Normalize(), transforms.ToTensor()
]))
loader = paddle.io.DataLoader(
dataset, batch_size=32, num_workers=num_workers, shuffle=True)
return loader

def run_check_on_cpu(self):
paddle.set_device('cpu')
loader = self.get_dataloader(0)
loader = self.get_dataloader(1)
for image, label in loader:
self.assertTrue(image.place.is_cpu_place())
self.assertTrue(label.place.is_cpu_place())
Expand All @@ -66,12 +70,7 @@ def test_single_process(self):
for image, label in loader:
self.assertTrue(image.place.is_gpu_place())
self.assertTrue(label.place.is_cuda_pinned_place())
# FIXME(dkp): when input tensor is in GPU place and
# iteration break in the median, it seems the GPU
# tensor put into blocking_queue cannot be safely
# released and may cause ABRT/SEGV, this should
# be fixed
# break
break

def test_multi_process(self):
# DataLoader with multi-process mode is not supported on MacOs and Windows currently
Expand Down

0 comments on commit e93c18a

Please sign in to comment.