Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GeneratorEnqueuer Multithreading on Windows (and Linux...) #8662

Merged
merged 4 commits into from
Dec 12, 2017

Conversation

philferriere
Copy link
Contributor

Modified Files

  • keras\utils\data_utils.py
  • tests\test_multiprocessing.py

Issues with GeneratorEnqueuer Multithreading on Windows (and Linux...)

Open bugs:

#6582
#5071

Stale but legit bugs:

#3962
#5510

Any attempt to use multithreading or multiprocessing on Windows will result in a AttributeError: Can't pickle local object 'GeneratorEnqueuer.start.<locals>.data_generator_task' error in the multiprocessing package, as shwon below:

(dlwin36tf140kerasmaster) Phil@SERVERP e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\Lib\site-packages\keras\tests
$ py.test test_multiprocessing.py
============================= test session starts =============================
platform win32 -- Python 3.6.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\python.exe
cachedir: ..\.cache
rootdir: e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\Lib\site-packages\keras, inifile: pytest.ini
collected 7 items

test_multiprocessing.py::test_multiprocessing_training FAILED
test_multiprocessing.py::test_multiprocessing_training_fromfile FAILED
test_multiprocessing.py::test_multiprocessing_training_fromfile ERROR
test_multiprocessing.py::test_multiprocessing_predicting FAILED
test_multiprocessing.py::test_multiprocessing_evaluating FAILED
test_multiprocessing.py::test_multiprocessing_fit_error FAILED
test_multiprocessing.py::test_multiprocessing_evaluate_error FAILED
test_multiprocessing.py::test_multiprocessing_predict_error FAILED

[...]

================================== FAILURES ===================================
________________________ test_multiprocessing_training ________________________

    @keras_test
    def test_multiprocessing_training():
        arr_data = np.random.randint(0, 256, (50, 2))
        arr_labels = np.random.randint(0, 2, 50)
        arr_weights = np.random.random(50)

        def custom_generator(use_weights=False):
            batch_size = 10
            n_samples = 50

            while True:
                batch_index = np.random.randint(0, n_samples - batch_size)
                start = batch_index
                end = start + batch_size
                X = arr_data[start: end]
                y = arr_labels[start: end]
                if use_weights:
                    w = arr_weights[start: end]
                    yield X, y, w
                else:
                    yield X, y

        # Build a NN
        model = Sequential()
        model.add(Dense(1, input_shape=(2, )))
        model.compile(loss='mse', optimizer='adadelta')

        model.fit_generator(custom_generator(),
                            steps_per_epoch=5,
                            epochs=1,
                            verbose=1,
                            max_queue_size=10,
                            workers=4,
>                           use_multiprocessing=True)

test_multiprocessing.py:54:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\legacy\interfaces.py:87: in wrapper
    return func(*args, **kwargs)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\models.py:1227: in fit_generator
    initial_epoch=initial_epoch)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\legacy\interfaces.py:87: in wrapper
    return func(*args, **kwargs)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2104: in fit_generator
    enqueuer.start(workers=workers, max_queue_size=max_queue_size)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\utils\data_utils.py:674: in start
    thread.start()
..\..\..\multiprocessing\process.py:105: in start
    self._popen = self._Popen(self)
..\..\..\multiprocessing\context.py:223: in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
..\..\..\multiprocessing\context.py:322: in _Popen
    return Popen(process_obj)
..\..\..\multiprocessing\popen_spawn_win32.py:65: in __init__
    reduction.dump(process_obj, to_child)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

obj = <Process(Process-2, initial daemon)>, file = <_io.BufferedWriter name=11>
protocol = None

    def dump(obj, file, protocol=None):
        '''Replacement for pickle.dump() using ForkingPickler.'''
>       ForkingPickler(file, protocol).dump(obj)
E       AttributeError: Can't pickle local object 'GeneratorEnqueuer.start.<locals>.data_generator_task'

..\..\..\multiprocessing\reduction.py:60: AttributeError

[...]

Converting the data_generator_task() local function to a GeneratorEnqueuer class method fixes the issue. Fixing the error above, however, doesn't fix a more general problem with multiprocessing on Windows. Indeed, on Windows, multiprocessing cannot marshall objects that contain generators across process boundaries. Attempting to do so will systematically generate a TypeError: can't pickle generator objects error, as shown here:

[...]

Traceback (most recent call last):
  File "E:\repos\toolkits\keras_mpc_bug\tests\test_multiprocessing.py", line 77, in <module>
    test_multiprocessing_training()
  File "E:\repos\toolkits\keras_mpc_bug\tests\test_multiprocessing.py", line 41, in test_multiprocessing_training
    use_multiprocessing=True)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\legacy\interfaces.py", line 87, in wrapper
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\models.py", line 1227, in fit_generator
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\legacy\interfaces.py", line 87, in wrapper
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py", line 2104, in fit_generator
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\utils\data_utils.py", line 663, in start
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle generator objects

[...]

Our two-pronged data_utils.py fix for these issues is the following:

  • On all platforms, convert the data_generator_task() local function to a GeneratorEnqueuer class method.
  • On Windows, raise a ValueError exception instead if use_multiprocessing is set to True and suggest alternative in error message, such as using multithreading, BUT --

-- BUT the current code in data_utils.py does not work properly in multithreading mode. Since calls to the generator's next() function are not serialized, execution sytematically results in a ValueError: generator already executing (both on Windows and Linux!). See notes below on test_multiprocessing.py to see why this seldom shows up on Linux.

Our data_utils.py fix for this additional issue is the following:

  • On all platforms, serialize calls to generator_output = next(generator) using a threading lock.
  • Initialize the internal queue max size to max_queue_size. Right now, it is not properly initialized and grows indefinitely on all platforms!

Yes, we are aware of Python's limited multithreaded abailities when it comes to the global interpreter lock (gil), as discussed here. We are also of the opinion that degraded performance is a better alternative to catastrophic failure in execution (as is the case right now). Without a fix, GeneratorEnqueuer is unusable on Windows.

If the PR for data_utils.py is rejected, please consider using our modified version of test_multiprocessing.py. The current version is woefully inadequate at catching multithreading and multiprocessing bugs or stressing the GeneratorEnqueuer queueing mechanism. Using our updated version, you will see that the same multithreading issues pop up immediately on Linux as well.

Our fix for test_multiprocessing.py to bring out threading issues and stressing the queue are the following:

  • Use at least the same number of tests for each of the seven test scenarios:
    • 4 workers + one main thread (use_multiprocessing set to True, then False)
    • 1 worker + one main thread (use_multiprocessing set to True, then False)
    • No worker + one main thread (use_multiprocessing set to True, then False)
  • Bump up the number of steps per epoch from 5 to 100:
    • Currently, the number of steps is 5. Since the maximum size of the queue is 10, this is clearly inadequate to really stress the queue in both multiprocessing and multithreading scenarios.

Code to Repro Multiprocessing Bugs and Test Fix

Simply run test_multiprocessing.py. Below, we show execution of this code (with and without the fix) in four different configurations:

  • Python 3.6 on Windows 10 with Tensorflow 1.4
  • Python 2.7 on Windows 10 with CNTK 2.3
  • Python 3.6 on Ubuntu 16.04 with Tensorflow 1.4
  • Python 2.7 on Ubuntu 16.04 with CNTK 2.3

dlwin36tf140kerasmaster (Python 3.6 on Windows 10 with Tensorflow 1.4)

Execution without the fix

(dlwin36tf140kerasmaster) Phil@SERVERP e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\Lib\site-packages\keras\tests
$ py.test test_multiprocessing.py
============================= test session starts =============================
platform win32 -- Python 3.6.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\python.exe
cachedir: ..\.cache
rootdir: e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\Lib\site-packages\keras, inifile: pytest.ini
collected 7 items

test_multiprocessing.py::test_multiprocessing_training FAILED
test_multiprocessing.py::test_multiprocessing_training_fromfile FAILED
test_multiprocessing.py::test_multiprocessing_training_fromfile ERROR
test_multiprocessing.py::test_multiprocessing_predicting FAILED
test_multiprocessing.py::test_multiprocessing_evaluating FAILED
test_multiprocessing.py::test_multiprocessing_fit_error FAILED
test_multiprocessing.py::test_multiprocessing_evaluate_error FAILED
test_multiprocessing.py::test_multiprocessing_predict_error FAILED

========================== slowest 10 test durations ==========================
0.51s call     tests/test_multiprocessing.py::test_multiprocessing_training
0.48s call     tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.47s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.25s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
0.25s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.24s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.24s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.01s setup    tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_fit_error
=================================== ERRORS ====================================
_________ ERROR at teardown of test_multiprocessing_training_fromfile _________

tmpdir = local('C:\\Users\\Phil\\AppData\\Local\\Temp\\pytest-of-Phil\\pytest-109\\test_multiprocessing_training_0')

    @pytest.fixture
    def in_tmpdir(tmpdir):
        """Runs a function in a temporary directory.

        Checks that the directory is empty afterwards.
        """
        with tmpdir.as_cwd():
            yield None
>       assert not tmpdir.listdir()
E       AssertionError: assert not [local('C:\\Users\\Phil\\AppData\\Local\\Temp\\pytest-of-Phil\\pytest-109\\test_multiprocessing_training_0\\data.npz')]
E        +  where [local('C:\\Users\\Phil\\AppData\\Local\\Temp\\pytest-of-Phil\\pytest-109\\test_multiprocessing_training_0\\data.npz')] = <bound method LocalPath.listdir of local('C:\\Users\\Phil\\AppData\\Local\\Temp\\pytest-of-Phil\\pytest-109\\test_multiprocessing_training_0')>()
E        +    where <bound method LocalPath.listdir of local('C:\\Users\\Phil\\AppData\\Local\\Temp\\pytest-of-Phil\\pytest-109\\test_multiprocessing_training_0')> = local('C:\\Users\\Phil\\AppData\\Local\\Temp\\pytest-of-Phil\\pytest-109\\test_multiprocessing_training_0').listdir

test_multiprocessing.py:18: AssertionError
-------------------------- Captured stderr teardown ---------------------------
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
================================== FAILURES ===================================
________________________ test_multiprocessing_training ________________________

    @keras_test
    def test_multiprocessing_training():
        arr_data = np.random.randint(0, 256, (50, 2))
        arr_labels = np.random.randint(0, 2, 50)
        arr_weights = np.random.random(50)

        def custom_generator(use_weights=False):
            batch_size = 10
            n_samples = 50

            while True:
                batch_index = np.random.randint(0, n_samples - batch_size)
                start = batch_index
                end = start + batch_size
                X = arr_data[start: end]
                y = arr_labels[start: end]
                if use_weights:
                    w = arr_weights[start: end]
                    yield X, y, w
                else:
                    yield X, y

        # Build a NN
        model = Sequential()
        model.add(Dense(1, input_shape=(2, )))
        model.compile(loss='mse', optimizer='adadelta')

        model.fit_generator(custom_generator(),
                            steps_per_epoch=5,
                            epochs=1,
                            verbose=1,
                            max_queue_size=10,
                            workers=4,
>                           use_multiprocessing=True)

test_multiprocessing.py:54:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\legacy\interfaces.py:87: in wrapper
    return func(*args, **kwargs)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\models.py:1227: in fit_generator
    initial_epoch=initial_epoch)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\legacy\interfaces.py:87: in wrapper
    return func(*args, **kwargs)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2104: in fit_generator
    enqueuer.start(workers=workers, max_queue_size=max_queue_size)
e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\utils\data_utils.py:674: in start
    thread.start()
..\..\..\multiprocessing\process.py:105: in start
    self._popen = self._Popen(self)
..\..\..\multiprocessing\context.py:223: in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
..\..\..\multiprocessing\context.py:322: in _Popen
    return Popen(process_obj)
..\..\..\multiprocessing\popen_spawn_win32.py:65: in __init__
    reduction.dump(process_obj, to_child)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

obj = <Process(Process-2, initial daemon)>, file = <_io.BufferedWriter name=11>
protocol = None

    def dump(obj, file, protocol=None):
        '''Replacement for pickle.dump() using ForkingPickler.'''
>       ForkingPickler(file, protocol).dump(obj)
E       AttributeError: Can't pickle local object 'GeneratorEnqueuer.start.<locals>.data_generator_task'

..\..\..\multiprocessing\reduction.py:60: AttributeError
...

Execution with the fix

(dlwin36tf140kerasmaster) Phil@SERVERP e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\Lib\site-packages\keras\tests
$ py.test test_multiprocessing.py
============================= test session starts =============================
platform win32 -- Python 3.6.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\python.exe
cachedir: ..\.cache
rootdir: e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\Lib\site-packages\keras, inifile: pytest.ini
collected 7 items

test_multiprocessing.py::test_multiprocessing_training PASSED
test_multiprocessing.py::test_multiprocessing_training_from_file PASSED
test_multiprocessing.py::test_multiprocessing_predicting PASSED
test_multiprocessing.py::test_multiprocessing_evaluating PASSED
test_multiprocessing.py::test_multiprocessing_fit_error PASSED
test_multiprocessing.py::test_multiprocessing_evaluate_error PASSED
test_multiprocessing.py::test_multiprocessing_predict_error PASSED

========================== slowest 10 test durations ==========================
4.19s call     tests/test_multiprocessing.py::test_multiprocessing_training
2.04s call     tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.61s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
0.57s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.54s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.12s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.11s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.02s setup    tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training_from_file
============================== warnings summary ===============================
tests/test_multiprocessing.py::test_multiprocessing_training
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_training_from_file
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_predicting
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluating
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_fit_error
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_predict_error
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin36tf140kerasmaster\lib\site-packages\keras-2.1.2-py3.6.egg\keras\engine\training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

-- Docs: http://doc.pytest.org/en/latest/warnings.html
==================== 7 passed, 7 warnings in 11.44 seconds ====================
2017-12-02 08:50:53.632443: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\platform\cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX AVX2
2017-12-02 08:50:54.016767: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1030] Found device 0 with properties:
name: GeForce GTX TITAN X major: 5 minor: 2 memoryClockRate(GHz): 1.076
pciBusID: 0000:03:00.0
totalMemory: 12.00GiB freeMemory: 10.06GiB
2017-12-02 08:50:54.016804: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: GeForce GTX TITAN X, pci bus id: 0000:03:00.0, compute capability: 5.2)
2017-12-02 08:50:57.848682: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: GeForce GTX TITAN X, pci bus id: 0000:03:00.0, compute capability: 5.2)
2017-12-02 08:50:59.634973: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: GeForce GTX TITAN X, pci bus id: 0000:03:00.0, compute capability: 5.2)
2017-12-02 08:51:00.172749: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: GeForce GTX TITAN X, pci bus id: 0000:03:00.0, compute capability: 5.2)
2017-12-02 08:51:01.090078: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: GeForce GTX TITAN X, pci bus id: 0000:03:00.0, compute capability: 5.2)
2017-12-02 08:51:01.358376: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: GeForce GTX TITAN X, pci bus id: 0000:03:00.0, compute capability: 5.2)
2017-12-02 08:51:01.480222: I C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\common_runtime\gpu\gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: GeForce GTX TITAN X, pci bus id: 0000:03:00.0, compute capability: 5.2)

Steps to recreate test environment

$ conda create --yes -n dlwin36 numpy scipy mkl-service matplotlib pandas pillow scikit-learn jupyter pytest
$ conda create --name dlwin36tf140kerasmaster --clone dlwin36
$ activate dlwin36tf140kerasmaster 
$ pip install tensorflow-gpu==1.4.0 
$ cd "%CONDA_PREFIX%\Lib\site-packages"
$ git clone git://github.com/fchollet/keras.git
$ cd keras
$ python setup.py install

dlwin27cntk23kerasmaster (Python 2.7 on Windows 10 with CNTK 2.3)

Execution without the fix

(dlwin27cntk23kerasmaster) Phil@SERVERP e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\Lib\site-packages\keras\tests
$ py.test test_multiprocessing.py
============================= test session starts =============================
platform win32 -- Python 2.7.13, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\python.exe
cachedir: ..\.cache
rootdir: e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\Lib\site-packages\keras, inifile: pytest.ini
collected 7 items

test_multiprocessing.py::test_multiprocessing_training FAILED
test_multiprocessing.py::test_multiprocessing_training_fromfile FAILED
test_multiprocessing.py::test_multiprocessing_training_fromfile ERROR
test_multiprocessing.py::test_multiprocessing_predicting FAILED
test_multiprocessing.py::test_multiprocessing_evaluating FAILED
test_multiprocessing.py::test_multiprocessing_fit_error FAILED
test_multiprocessing.py::test_multiprocessing_evaluate_error FAILED
test_multiprocessing.py::test_multiprocessing_predict_error FAILED

========================== slowest 10 test durations ==========================
0.63s call     tests/test_multiprocessing.py::test_multiprocessing_training
0.18s call     tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.15s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.14s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.14s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.14s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
0.14s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.01s setup    tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_predicting
=================================== ERRORS ====================================
_________ ERROR at teardown of test_multiprocessing_training_fromfile _________

tmpdir = local('c:\\users\\phil\\appdata\\local\\temp\\pytest-of-Phil\\pytest-103\\test_multiprocessing_training_0')

    @pytest.fixture
    def in_tmpdir(tmpdir):
        """Runs a function in a temporary directory.

        Checks that the directory is empty afterwards.
        """
        with tmpdir.as_cwd():
            yield None
>       assert not tmpdir.listdir()
E       AssertionError: assert not [local('c:\\users\\phil\\appdata\\local\\temp\\pytest-of-Phil\\pytest-103\\test_multiprocessing_training_0\\data.npz')]
E        +  where [local('c:\\users\\phil\\appdata\\local\\temp\\pytest-of-Phil\\pytest-103\\test_multiprocessing_training_0\\data.npz')] = <bound method LocalPath.listdir of local('c:\\users\\phil\\appdata\\local\\temp\\pytest-of-Phil\\pytest-103\\test_multiprocessing_training_0')>()
E        +    where <bound method LocalPath.listdir of local('c:\\users\\phil\\appdata\\local\\temp\\pytest-of-Phil\\pytest-103\\test_multiprocessing_training_0')> = local('c:\\users\\phil\\appdata\\local\\temp\\pytest-of-Phil\\pytest-103\\test_multiprocessing_training_0').listdir

test_multiprocessing.py:18: AssertionError
-------------------------- Captured stderr teardown ---------------------------
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\pickle.py", line 886, in load_eof
    raise EOFError
EOFError
================================== FAILURES ===================================
________________________ test_multiprocessing_training ________________________

    @keras_test
    def test_multiprocessing_training():
        arr_data = np.random.randint(0, 256, (50, 2))
        arr_labels = np.random.randint(0, 2, 50)
        arr_weights = np.random.random(50)

        def custom_generator(use_weights=False):
            batch_size = 10
            n_samples = 50

            while True:
                batch_index = np.random.randint(0, n_samples - batch_size)
                start = batch_index
                end = start + batch_size
                X = arr_data[start: end]
                y = arr_labels[start: end]
                if use_weights:
                    w = arr_weights[start: end]
                    yield X, y, w
                else:
                    yield X, y

        # Build a NN
        model = Sequential()
        model.add(Dense(1, input_shape=(2, )))
        model.compile(loss='mse', optimizer='adadelta')

        model.fit_generator(custom_generator(),
                            steps_per_epoch=5,
                            epochs=1,
                            verbose=1,
                            max_queue_size=10,
                            workers=4,
>                           use_multiprocessing=True)

test_multiprocessing.py:54:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
build\bdist.win-amd64\egg\keras\legacy\interfaces.py:87: in wrapper
    ???
build\bdist.win-amd64\egg\keras\models.py:1227: in fit_generator
    ???
build\bdist.win-amd64\egg\keras\legacy\interfaces.py:87: in wrapper
    ???
build\bdist.win-amd64\egg\keras\engine\training.py:2104: in fit_generator
    ???
build\bdist.win-amd64\egg\keras\utils\data_utils.py:674: in start
    ???
..\..\..\multiprocessing\process.py:130: in start
    self._popen = Popen(self)
..\..\..\multiprocessing\forking.py:277: in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
..\..\..\multiprocessing\forking.py:199: in dump
    ForkingPickler(file, protocol).dump(obj)
..\..\..\pickle.py:224: in dump
    self.save(obj)
..\..\..\pickle.py:331: in save
    self.save_reduce(obj=obj, *rv)
..\..\..\pickle.py:425: in save_reduce
    save(state)
..\..\..\pickle.py:286: in save
    f(self, obj) # Call unbound method with explicit self
..\..\..\pickle.py:655: in save_dict
    self._batch_setitems(obj.iteritems())
..\..\..\pickle.py:687: in _batch_setitems
    save(v)
..\..\..\pickle.py:286: in save
    f(self, obj) # Call unbound method with explicit self
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <multiprocessing.forking.ForkingPickler instance at 0x0000000008817608>
obj = <function data_generator_task at 0x00000000087FE978>
name = 'data_generator_task', pack = <built-in function pack>

    def save_global(self, obj, name=None, pack=struct.pack):
        write = self.write
        memo = self.memo

        if name is None:
            name = obj.__name__

        module = getattr(obj, "__module__", None)
        if module is None:
            module = whichmodule(obj, name)

        try:
            __import__(module)
            mod = sys.modules[module]
            klass = getattr(mod, name)
        except (ImportError, KeyError, AttributeError):
            raise PicklingError(
                "Can't pickle %r: it's not found as %s.%s" %
>               (obj, module, name))
E           PicklingError: Can't pickle <function data_generator_task at 0x00000000087FE978>: it's not found as keras.utils.data_utils.data_generator_task

..\..\..\pickle.py:754: PicklingError
...

Execution with the fix

(dlwin27cntk23kerasmaster) Phil@SERVERP e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\Lib\site-packages\keras\tests
$ py.test test_multiprocessing.py
============================= test session starts =============================
platform win32 -- Python 2.7.13, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\python.exe
cachedir: ..\.cache
rootdir: e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\Lib\site-packages\keras, inifile: pytest.ini
collected 7 items

test_multiprocessing.py::test_multiprocessing_training PASSED
test_multiprocessing.py::test_multiprocessing_training_from_file PASSED
test_multiprocessing.py::test_multiprocessing_predicting PASSED
test_multiprocessing.py::test_multiprocessing_evaluating PASSED
test_multiprocessing.py::test_multiprocessing_fit_error PASSED
test_multiprocessing.py::test_multiprocessing_evaluate_error PASSED
test_multiprocessing.py::test_multiprocessing_predict_error PASSED

========================== slowest 10 test durations ==========================
2.40s call     tests/test_multiprocessing.py::test_multiprocessing_training
1.54s call     tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.80s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
0.51s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.06s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.03s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.03s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.01s setup    tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.00s setup    tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training_from_file
============================== warnings summary ===============================
tests/test_multiprocessing.py::test_multiprocessing_training
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\site-packages\keras-2.1.2-py2.7.egg\keras\engine\training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\site-packages\cntk\core.py:361: UserWarning: your data is of type "float64", but your input variable (uid "Input23") expects "<type 'numpy.float32'>". Please convert your data beforehand to speed up training.
    (sample.dtype, var.uid, str(var.dtype)))
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\site-packages\_pytest\warnings.py:88: UnicodeWarning: Warning is using unicode non convertible to ascii, converting to a safe representation:
    e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\site-packages\cntk\core.py:361: UserWarning: your data is of type "float64", but your input variable (uid "Input23") expects "<type 'numpy.float32'>". Please convert your data beforehand to speed up training.
    (sample.dtype, var.uid, str(var.dtype)))

    UnicodeWarning)

tests/test_multiprocessing.py::test_multiprocessing_predicting
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\site-packages\keras-2.1.2-py2.7.egg\keras\engine\training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluating
  e:\toolkits.win\anaconda3-4.4.0\envs\dlwin27cntk23kerasmaster\lib\site-packages\keras-2.1.2-py2.7.egg\keras\engine\training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

-- Docs: http://doc.pytest.org/en/latest/warnings.html
==================== 7 passed, 5 warnings in 6.69 seconds =====================

Steps to recreate test environment

$ conda create --yes -n dlwin27 python=2.7
$ activate dlwin27
$ conda install --yes numpy scipy mkl-service matplotlib pandas pillow scikit-learn jupyter pytest
$ deactivate
$ conda create --name dlwin27cntk23kerasmaster --clone dlwin27
$ activate dlwin27cntk23kerasmaster 
$ pip install https://cntk.ai/PythonWheel/GPU/cntk-2.3-cp27-cp27m-win_amd64.whl
$ cd "%CONDA_PREFIX%\Lib\site-packages"
$ git clone git://github.com/fchollet/keras.git
$ cd keras
$ python setup.py install
$ set KERAS_BACKEND=cntk

dlubu36tf140kerasmaster (Python 3.6 on Ubuntu 16.04 with Tensorflow 1.4)

Execution without the fix

(dlubu36tf140kerasmaster) phil@DESKTOPP:/media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/keras/tests$ py.test test_multiprocessing.py
======================================================= test session starts ========================================================
platform linux -- Python 3.6.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/bin/python
cachedir: ../.cache
rootdir: /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/keras, inifile: pytest.ini
collected 7 items                                                                                                                   

test_multiprocessing.py::test_multiprocessing_training PASSED
test_multiprocessing.py::test_multiprocessing_training_fromfile PASSED
test_multiprocessing.py::test_multiprocessing_predicting PASSED
test_multiprocessing.py::test_multiprocessing_evaluating PASSED
test_multiprocessing.py::test_multiprocessing_fit_error PASSED
test_multiprocessing.py::test_multiprocessing_evaluate_error PASSED
test_multiprocessing.py::test_multiprocessing_predict_error PASSED

==================================================== slowest 10 test durations =====================================================
1.58s call     tests/test_multiprocessing.py::test_multiprocessing_training
0.60s call     tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.37s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.25s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.24s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.20s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
0.18s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.01s setup    tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_predict_error
========================================================= warnings summary =========================================================
tests/test_multiprocessing.py::test_multiprocessing_training
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_predicting
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluating
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_fit_error
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_predict_error
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

-- Docs: http://doc.pytest.org/en/latest/warnings.html
=============================================== 7 passed, 7 warnings in 8.70 seconds ===============================================

Execution with the fix

(dlubu36tf140kerasmaster) phil@DESKTOPP:/media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/keras/tests$ py.test /media/EDrive/repos/toolkits/keras_mpc_bug/tests/test_multiprocessing.py
======================================================= test session starts ========================================================
platform linux -- Python 3.6.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/bin/python
cachedir: ../.cache
rootdir: /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/keras, inifile: pytest.ini
collected 7 items                                                                                                                   

test_multiprocessing.py::test_multiprocessing_training PASSED
test_multiprocessing.py::test_multiprocessing_training_from_file PASSED
test_multiprocessing.py::test_multiprocessing_predicting PASSED
test_multiprocessing.py::test_multiprocessing_evaluating PASSED
test_multiprocessing.py::test_multiprocessing_fit_error PASSED
test_multiprocessing.py::test_multiprocessing_evaluate_error PASSED
test_multiprocessing.py::test_multiprocessing_predict_error PASSED

==================================================== slowest 10 test durations =====================================================
5.51s call     tests/test_multiprocessing.py::test_multiprocessing_training
3.75s call     tests/test_multiprocessing.py::test_multiprocessing_training_from_file
1.46s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
1.15s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.57s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.28s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.26s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.01s setup    tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.00s setup    tests/test_multiprocessing.py::test_multiprocessing_training
========================================================= warnings summary =========================================================
tests/test_multiprocessing.py::test_multiprocessing_training
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_training_from_file
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_predicting
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluating
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_fit_error
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_predict_error
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu36tf140kerasmaster/lib/python3.6/site-packages/Keras-2.1.2-py3.6.egg/keras/engine/training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

-- Docs: http://doc.pytest.org/en/latest/warnings.html
============================================== 7 passed, 7 warnings in 18.28 seconds ===============================================

dlubu27cntk23kerasmaster (Python 2.7 on Ubuntu 16.04 with CNTK 2.3)

Execution without the fix

(dlubu27cntk23kerasmaster) phil@DESKTOPP:/media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/keras/tests$ py.test test_multiprocessing.py
======================================================= test session starts ========================================================
platform linux2 -- Python 2.7.13, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/bin/python
cachedir: ../.cache
rootdir: /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/keras, inifile: pytest.ini
collected 7 items                                                                                                                   

test_multiprocessing.py::test_multiprocessing_training PASSED
test_multiprocessing.py::test_multiprocessing_training_fromfile PASSED
test_multiprocessing.py::test_multiprocessing_predicting PASSED
test_multiprocessing.py::test_multiprocessing_evaluating PASSED
test_multiprocessing.py::test_multiprocessing_fit_error PASSED
test_multiprocessing.py::test_multiprocessing_evaluate_error PASSED
test_multiprocessing.py::test_multiprocessing_predict_error PASSED

==================================================== slowest 10 test durations =====================================================
0.97s call     tests/test_multiprocessing.py::test_multiprocessing_training
0.19s call     tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.17s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.14s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.12s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.12s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
0.10s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.00s setup    tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training_fromfile
0.00s setup    tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
========================================================= warnings summary =========================================================
tests/test_multiprocessing.py::test_multiprocessing_training
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/Keras-2.1.2-py2.7.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/cntk/core.py:361: UserWarning: your data is of type "float64", but your input variable (uid "Input23") expects "<type 'numpy.float32'>". Please convert your data beforehand to speed up training.
    (sample.dtype, var.uid, str(var.dtype)))
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/_pytest/warnings.py:88: UnicodeWarning: Warning is using unicode non convertible to ascii, converting to a safe representation:
    /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/cntk/core.py:361: UserWarning: your data is of type "float64", but your input variable (uid "Input23") expects "<type 'numpy.float32'>". Please convert your data beforehand to speed up training.
    (sample.dtype, var.uid, str(var.dtype)))
  
    UnicodeWarning)

tests/test_multiprocessing.py::test_multiprocessing_predicting
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/Keras-2.1.2-py2.7.egg/keras/engine/training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluating
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/Keras-2.1.2-py2.7.egg/keras/engine/training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

-- Docs: http://doc.pytest.org/en/latest/warnings.html
=============================================== 7 passed, 5 warnings in 3.74 seconds ===============================================

Execution with the fix

(dlubu27cntk23kerasmaster) phil@DESKTOPP:/media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/keras$ py.test /media/EDrive/repos/toolkits/keras_mpc_bug/tests/test_multiprocessing.py
======================================================= test session starts ========================================================
platform linux2 -- Python 2.7.13, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/bin/python
cachedir: ../.cache
rootdir: /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/keras, inifile: pytest.ini
collected 7 items                                                                                                                   

test_multiprocessing.py::test_multiprocessing_training PASSED
test_multiprocessing.py::test_multiprocessing_training_from_file PASSED
test_multiprocessing.py::test_multiprocessing_predicting PASSED
test_multiprocessing.py::test_multiprocessing_evaluating PASSED
test_multiprocessing.py::test_multiprocessing_fit_error PASSED
test_multiprocessing.py::test_multiprocessing_evaluate_error PASSED
test_multiprocessing.py::test_multiprocessing_predict_error PASSED

==================================================== slowest 10 test durations =====================================================
4.53s call     tests/test_multiprocessing.py::test_multiprocessing_training
2.85s call     tests/test_multiprocessing.py::test_multiprocessing_training_from_file
1.61s call     tests/test_multiprocessing.py::test_multiprocessing_evaluating
0.95s call     tests/test_multiprocessing.py::test_multiprocessing_predicting
0.23s call     tests/test_multiprocessing.py::test_multiprocessing_fit_error
0.18s call     tests/test_multiprocessing.py::test_multiprocessing_evaluate_error
0.17s call     tests/test_multiprocessing.py::test_multiprocessing_predict_error
0.00s setup    tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.00s teardown tests/test_multiprocessing.py::test_multiprocessing_training_from_file
0.00s setup    tests/test_multiprocessing.py::test_multiprocessing_predict_error
========================================================= warnings summary =========================================================
tests/test_multiprocessing.py::test_multiprocessing_training
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/Keras-2.1.2-py2.7.egg/keras/engine/training.py:2023: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/cntk/core.py:361: UserWarning: your data is of type "float64", but your input variable (uid "Input23") expects "<type 'numpy.float32'>". Please convert your data beforehand to speed up training.
    (sample.dtype, var.uid, str(var.dtype)))
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/_pytest/warnings.py:88: UnicodeWarning: Warning is using unicode non convertible to ascii, converting to a safe representation:
    /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/cntk/core.py:361: UserWarning: your data is of type "float64", but your input variable (uid "Input23") expects "<type 'numpy.float32'>". Please convert your data beforehand to speed up training.
    (sample.dtype, var.uid, str(var.dtype)))
  
    UnicodeWarning)

tests/test_multiprocessing.py::test_multiprocessing_predicting
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/Keras-2.1.2-py2.7.egg/keras/engine/training.py:2375: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

tests/test_multiprocessing.py::test_multiprocessing_evaluating
  /media/EDrive/toolkits.ubu/anaconda3-4.4.0/envs/dlubu27cntk23kerasmaster/lib/python2.7/site-packages/Keras-2.1.2-py2.7.egg/keras/engine/training.py:2251: UserWarning: Using a generator with `use_multiprocessing=True` and multiple workers may duplicate your data. Please consider using the`keras.utils.Sequence class.

-- Docs: http://doc.pytest.org/en/latest/warnings.html
============================================== 7 passed, 5 warnings in 12.50 seconds ===============================================

Steps to recreate test environment

$ conda create --yes -n dlubu27 python=2.7
$ source activate dlubu27
$ conda install --yes numpy scipy mkl-service matplotlib pandas pillow scikit-learn jupyter pytest
$ source deactivate
$ conda create --name dlubu27cntk23kerasmaster --clone dlubu27
$ source activate dlubu27cntk23kerasmaster 
$ pip install https://cntk.ai/PythonWheel/GPU/cntk-2.3-cp27-cp27mu-linux_x86_64.whl
$ cd $CONDA_PREFIX/lib/python2.7/site-packages
$ git clone git://github.com/fchollet/keras.git
$ cd keras
$ python setup.py install

@Dref360
Copy link
Contributor

Dref360 commented Dec 2, 2017

At this point, what's the use case of using multiple threads for a generator?

Is there any speedup doing multithreading, since the GIL would kill any concurrency anyway? Shouldn't we just limit the use_multiprocessing=False to just one worker?

I cannot speak for the Windows case since I do not own one.

@philferriere
Copy link
Contributor Author

Good questions, Frédéric.

This is really just a bug fix and is mostly about feature parity (not performance improvements) between Linux and Windows. I don't aim to support new use cases/scenarios either. If you need to know what my specific use case is, I have a personal interest in getting this project to work on Windows.

Perhaps your questions point to a larger interface design issue. I agree with you on the limitations imposed by the GIL. It seems to me that your valid concerns really apply to all platforms and I wasn't attempting to address them with this fix.

As of today, multi-process and multi-threaded generators are simply broken on Windows. And, by broken, I do mean code execution crashes (you'll see there are several bugs that have been reported over time). Being a huge fan of Keras, I don't want to forced to move to a different high-level deep learning library (Gluon?) because crashes on Windows are simply tolerated and bugs don't get fixed on the platform I have to support.

I will leave it to you to come up with what I'm sure will be good answers to the larger Keras API issue.

setattr(e, '__traceback__', None)
elif not hasattr(e, '__traceback__'):
setattr(e, '__traceback__', sys.exc_info()[2])
traceback.print_exc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the traceback when using threads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a question better asked to @de-vri-es who made commit 4a58b178073f0ba3b166220f7ebd7d56149bfb20

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the exception is put in the queue and rethrown in the main thread. Printing the trackback twice is no good. Also, if someone de decides to rethrow the exception you don't want the trackbacks printer at all.

The only reason I kept the print in multiprocessing=True is that the trackback cant be pickled, and cant be put in an inter-process queue.

Unconditionally printing the trackback is not a good idea in my opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant "if someone de decides to catch the exception you don't want the trackback printed at all" . I should 't do this from my phone apparantly...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you undid my work in a merge conflict here.

Copy link
Contributor Author

@philferriere philferriere Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so. The test for multiprocessing=True has been taken out of the while loop for clarity (there is now a code path for multiprocessing and one for multithreading). The diff displayed by Github appears misleading however. @de-vri-es, would you mind looking at the files side by side and tell me if you agree? I would greatly appreciate it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I didn't see the full diff (I really shouldn't have continued on my phone). I see the if/else is still there but much earlier. Looks good =]

self._stop_event = threading.Event()

for _ in range(workers):
if self._use_multiprocessing:
# Reset random seed else all children processes
# share the same seed
np.random.seed(self.seed)
thread = multiprocessing.Process(target=data_generator_task)
thread = multiprocessing.Process(target=self.data_generator_task, args=())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused keyword args

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest commit

import pytest
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense
from keras.utils.test_utils import keras_test

gsteps_per_epoch = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constants should be upper case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest commit. Apologies for ignoring PEP 8 errs and warns in previous commit. This has been taken care of as well.

@Dref360
Copy link
Contributor

Dref360 commented Dec 3, 2017

Your PR is fine, if multiprocessing doesn't work on Windows, we shouldn't try to support it.

@philferriere
Copy link
Contributor Author

Thanks, Frédéric.

break
except Exception as e:
# Can't pick tracebacks.
# As a compromise, print the traceback and pickle None instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was only valid for the multiprocessing branch. It can be removed here.

@de-vri-es
Copy link
Contributor

The real underlying issue here seems to be that python generators simply aren't suitable for multi-threading or multiprocessing. In the case of multiprocessing you'll always get each worker generating the same sequence, and in the case of multi-threading there can't really be any parallelism.

An interface which implies to do something impossible without actually doing it does more harm than good, I think. How about adding deprecation warnings to using generators with more than 1 worker and eventually remove support all together?

If there is a valid use case we should come up with an interface which does actually work. Sadly that probably means no Python generators, nice as they are.

@Dref360
Copy link
Contributor

Dref360 commented Dec 4, 2017

There is a warning : here

generator_output = next(self._generator)
self.queue.put((True, generator_output))
else:
time.sleep(self.wait_time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sleeping with a lock held seems bad. Shouldn't it be sufficient to guard this line with the lock:

generator_output = next(self._generator)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my (admittedly limited) experience with generators, the thread is mostly waiting on getting an available slot in the queue rather than on getting samples from the generator. I guess this is all very dependent on whatever your use case scenario is.
With the above in mind, I believe it is much better sleeping than busy waiting on the queue not being full anymore (that's essentially what the self.queue.qsize() < self.max_queue_size test is about).
Or course, a much better fix would have the lock replaced with a condition variable and signalling with the queue as wells as the generator. I assumed you decided not to go that way in the first place because you wanted to make it easy on developers to write generators. Or, perhaps, because of the limitations of multithreading in Python, you decided there was absolutely no point on making it much harder on developers to have to adhere to a much more sophisticated synchronization mechanism for a feature that is so clobbered by the gil anyway...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's not really a busy wait since its mostly sleeping. However, I have to agree that there isn't that much to be gained by holding the lock shorter.

I think it would only matter if putting the object in the queue is a time consuming action (which it isn't for the threading case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also bothered by the sleep in the lock and thought to correct it. Even though the code can be refactored to avoid it, WE CANT lock only the generator_output = next(self._generator). This will not work. I thought to put a comment here, if someone tries to do this on the future.

This PR makes the queue to have fixed size. If we don't protect both the iterator and the addition to the queue, we can end up with threads being stack while trying to add to the queue. Because the stop() operation joins and waits for the threads to finish, this will not happen. To avoid sleeping in a lock, the function needs to be heavily refactored and to echo @philferriere might not be 100% worth it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, I'll just remove this use case. I suspect having workers > 1 and use_multiprocessing == False doesn't gives any real speedup.

I'll do some profiling and post my findings here.

Also, we should try to mimic the Ordered Enqueuer so this class will get heavily refactored anytime soon anyway. (By the end of August)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With use_multiprocessing=False, how do you avoid ValueError : generator already executing without adding a Lock and therefore lose all speedup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me provide some context and then move to benchmarks. Here are the key Keras releases for this discussion:
2.0.5: All Generator classes for Images used Iterator logic.
2.0.6: Sequences introduced.
2.1.3: Locks introduced in GeneratorEnqueuer

Before the introduction of Sequences (<=2.0.5), all Generator classes in Keras implemented next(). They were also thread-safe and as a result the GeneratorEnqueuer did not require any locks. On the other hand, python generator methods did not work well as they were throwing ValueError : generator already executing. This was fixed by @philferriere in this PR and it was released on 2.1.3.

When I talk about thread-safe generators, I'm not refering to python generator methods but to Generator classes like those used by Keras before 2.0.5. These classes used an Iterator and made sure by using minimal locking that all the heavy computations are done ourside the lock. Since C libs can release the GIL, this gave speed improvements. When Sequences were introduced, many of the classes were rewritten and instead of next() they implemented __getitem__(). Sequences are better when concurrency is necessary, nevertheless not all use cases can implement a "random-access" logic; still they can be thread-safe. Thus having the ability to build thread-safe generator classes is still useful IMO. The lock introduced on 2.1.3, stops those valid cases from using proper parallelism.

Below I provide a snippet that can be ran with and without the lock (just comment out line 650). To produce something more than a toy example, I use the ImageDataGenerator class with realistic configuration to load real images (note that I am aware that ImageDataGenerator will return a Sequence but that has no effect here; for an Iterator-based implementation you can take the older version of the class from Keras 2.0.5 and get the same results. What we need here is a thread-safe "iterator" with heavy computations in non-locking parts.).

Snippet:

import time
from keras.preprocessing import image
from keras.utils.data_utils import GeneratorEnqueuer

it = image.ImageDataGenerator().flow_from_directory('/path/to/images', target_size=(224, 224), batch_size=512)

reader = GeneratorEnqueuer(it, use_multiprocessing=False)
reader.start(workers=16, max_queue_size=16)
g = reader.get()

start = time.time()
n = 100
for i in range(n):
	x=g.next()

total_time = time.time()-start
print('Total time %f sec; Average time: %f sec' % (total_time, total_time/n))

WITH LOCK: Total time 161.514681 sec; Average time: 1.615147 sec
WITHOUT LOCK: Total time 32.234057 sec; Average time: 0.322341 sec

That's 5 times faster, provided that the underlying iterator is thread-safe. As you can see, use_multiprocessing=False and workers>1 can be quite useful because many C libraries release the GIL. Unfortunately the lock introduced above, even though admittedly fixes the case of Python generators, it makes it impossible for thread-safe iterators to work fast. Note that this benchmark does not conflict yours. If I run the same code without the Enqueuer, its time is the same as with the Enqueuer & the lock (exactly what you said earlier).

In my opinion, removing the lock makes sense (or maybe checking the instance type and/or the existence of specific methods to understand if the lock is necessary?), because if you are using non-thread-safe Python generators you should not use workers>1. If you use thread-safe iterators though, it absolutely makes sense to use parallelism especially if you implement them with minimal locking. Finally I want to mention that this is not a theoretical discussion. I started investigating this problem because we observed GPU starvation and 4-5 times slower execution times on real-world applications after Keras 2.1.3. Our problem is solved by removing the lock and making a couple of minor changes on the queue configuration.

If that's interesting to you, I can draft a PR so that you can check it out and decide if it's something you want to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please draft a PR. I think we should keep a UX-friendly way of handling python generators. (ie. a good clear message stating that they should set workers=1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll put something together in the weekend and I'll tag you to get your feedback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, removing the lock makes sense (or maybe checking the instance type and/or the existence of specific methods to understand if the lock is necessary?), because if you are using non-thread-safe Python generators you should not use workers>1.

Yeah, I think it's a good idea to add some specific member variable to indicate that a generator is safe for running multi-threaded so it can be detected at runtime. That could then trigger a warning and enable the locking to serialize the next() calls with workers > 1.

@de-vri-es
Copy link
Contributor

There is a warning : here

Yeah, but only in the multiprocessing=True case. Turns out the other case also doesn't really do what you would expect.

@philferriere
Copy link
Contributor Author

@Dref360 @de-vri-es, please let me know if you need anything else from me.

If you approve this PR, may I suggest adding a small note to Docs » Models » Model (functional API) for the three generator methods (fit_generator, evaluate_generator, predict_generator)? Something along the lines of:

Using a generator with use_multiprocessing=True and workers>0 is not supported on Windows (no marshalling of generators across process boundaries) and will result in a ValueError exception. Instead, use single thread/process or multithreading.

Thank you both for your help with this!

@de-vri-es
Copy link
Contributor

Looks good to me. I do think it makes sense to update the API of GeneratorEnqueuer to reflect the fact that parallelism isn't really possible at all, but that is probably out of scope for this PR. Removing race conditions is a good first step.

fchollet
fchollet previously approved these changes Dec 10, 2017
Copy link
Collaborator

@fchollet fchollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

except StopIteration:
break
except Exception as e:
# Can't pick tracebacks.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pickle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

"""

def data_generator_task():
def data_generator_task(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a private method, I believe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

"""

def data_generator_task():
def __data_generator_task(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need one leading underscore to make a method private.

Copy link
Contributor Author

@philferriere philferriere Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Python's class module documentation:

“Private” instance variables that cannot be accessed except from inside an object don’t exist in Python. However, there is a convention that is followed by most Python code: a name prefixed with an underscore (e.g. _spam) should be treated as a non-public part of the API (whether it is a function, a method or a data member). It should be considered an implementation detail and subject to change without notice.

Since there is a valid use-case for class-private members (namely to avoid name clashes of names with names defined by subclasses), there is limited support for such a mechanism, called name mangling. Any identifier of the form __spam (at least two leading underscores, at most one trailing underscore) is textually replaced with _classname__spam, where classname is the current class name with leading underscore(s) stripped. This mangling is done without regard to the syntactic position of the identifier, as long as it occurs within the definition of a class.

The one-underscore approach assumes developers abiding by a coding convention and little else. Two underscores uses name mangling to enforce some level of privacy (easy to break, sure, but at least it is trying). Is the "by convention" approach the only one you need me to follow?

I just want to make sure. Thanks, @fchollet!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all private methods in the Keras codebase use a single leading underscore. Thanks!

Copy link
Collaborator

@fchollet fchollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

@fchollet fchollet merged commit 29ee89a into keras-team:master Dec 12, 2017
@adam-grant-hendry
Copy link

since the GIL would kill any concurrency anyway

The GIL kills parallelism, not concurrency:

  • multitasking (parallelism) = tasks literally run at the same time
  • multithreading (concurrency) = tasks can start, run, and complete in overlapping time so there is no down time, but only one task runs at a time

Just adding this so people aren't confused.

@de-vri-es
Copy link
Contributor

That's a little nitpicky, but also, the distinction between multitasking and multithreading as you describe it is quite arbitrary.

Nowadays, a "task" tends to refer to a concept from either your languague runtime or a library, and doesn't necessarily map to parallel execution. Multi-threading on the other hand is an execution model, possibly for "tasks", that actually runs parallel in practically any languague other than python since the days of multi-core CPUs (~2005).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants