Skip to content

Commit

Permalink
add timeout for queue.get
Browse files Browse the repository at this point in the history
  • Loading branch information
chenwhql committed May 6, 2021
1 parent 8b1b214 commit 378080c
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions python/paddle/reader/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import multiprocessing
import six
import sys
import warnings

from six.moves.queue import Queue
from six.moves import zip_longest
Expand All @@ -25,7 +26,9 @@
import itertools
import random
import zlib

import paddle.compat as cpt
from paddle.fluid.reader import QUEUE_GET_TIMEOUT

__all__ = []

Expand Down Expand Up @@ -584,10 +587,13 @@ def _impl():
raise NotImplementedError(
"The multiprocess_reader method is not supported on windows.")

# ujson is ultra fast json encoder and decoder written in pure C with bindings for Python 3.6+.
try:
import ujson as json
except Exception as e:
sys.stderr.write("import ujson error: " + str(e) + " use json\n")
warnings.warn(
"The `ujson` module is not found, use the `json` module, `ujson` encodes and decodes faster, "
"you can install `ujson` through `pip install ujson`.")
import json

assert isinstance(readers, (list, tuple)) and len(readers) > 0, (
Expand All @@ -614,11 +620,20 @@ def queue_reader():
reader_num = len(readers)
finish_num = 0
while finish_num < reader_num:
sample = queue.get()
try:
sample = queue.get(timeout=QUEUE_GET_TIMEOUT)
except:
logging.error(
"multiprocess_reader failed to get data from the multiprocessing.Queue."
)
six.reraise(*sys.exc_info())

if sample is None:
finish_num += 1
elif sample == "":
raise ValueError("multiprocess reader raises an exception")
raise ValueError(
"multiprocess_reader failed to put data into the multiprocessing.Queue."
)
else:
yield sample

Expand Down Expand Up @@ -660,7 +675,9 @@ def pipe_reader():
elif sample == "":
conn.close()
conn_to_remove.append(conn)
raise ValueError("multiprocess reader raises an exception")
raise ValueError(
"multiprocess_reader failed to send data into the multiprocessing.Pipe."
)
else:
yield sample

Expand Down

0 comments on commit 378080c

Please sign in to comment.