Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there a way to convert a custom keras.utils.Sequence custom class to a tf.Data pipeline? #39523

Closed
Abhishaike opened this issue May 13, 2020 · 16 comments
Assignees
Labels
comp:data tf.data related issues comp:keras Keras related issues stale This label marks the issue/pr stale - to be closed automatically if no activity stat:awaiting response Status - Awaiting response from author type:support Support issues

Comments

@Abhishaike
Copy link

When I was building up my data pipeline, the Tensorflow docs were very insistent that generators are unsafe for multiprocessing, and that the best way to build up a multiprocessing streaming pipeline is to extend tensorflow.keras.utils.Sequence into your own custom class. This is written here: https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence

So I did that, but now Tensorflow is telling me that Sequence extensions are ALSO not ideal for multiprocessing through the warning message multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.. So now the recommendation is to use tf.Data. And, as it were, I keep running into deadlocks 4~ epochs into training now.

Is there no converter between an existing sequence class and a tf.Data pipeline? It seems bizarre that the EXACT thing the Sequence extension class is recommended for seems to no longer work, and now only a brand new type of data pipeline will do the multiprocessing job. At the very least, this should be updated in the Sequence docs.

@Abhishaike Abhishaike added the type:others issues not falling in bug, perfromance, support, build and install or feature label May 13, 2020
@ravikyram ravikyram added comp:data tf.data related issues comp:keras Keras related issues type:support Support issues and removed type:others issues not falling in bug, perfromance, support, build and install or feature labels May 14, 2020
@ravikyram ravikyram assigned gowthamkpr and unassigned ravikyram May 14, 2020
@gowthamkpr
Copy link

Can you please explain more about your customs class. To understand the difference between keras.utils.Sequence and a tf.Data pipeline you can take a look at the following question.

@gowthamkpr gowthamkpr added the stat:awaiting response Status - Awaiting response from author label May 16, 2020
@google-ml-butler
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you.

@google-ml-butler google-ml-butler bot added the stale This label marks the issue/pr stale - to be closed automatically if no activity label May 23, 2020
@gowthamkpr
Copy link

Closing this issue as it has been inactive for 2 weeks. Please add additional comments for us to open this issue again. Thanks!

@MaxSchambach
Copy link

MaxSchambach commented Jun 22, 2020

I was facing exactly the same question. In particular when upgrading certain routines such as data generation from TF1 to TF2. The mentioned statement in the documentation seems confusing. I am also facing deadlocks irregularly and in a non-reproducible manner. When using generators, derived from keras.utils.Sequence and passed e.g. to tf.keras.Model.fit (as fit_generator is deprecated), is it the same internally as using tf.data.Dataset.from_generator?

If so, would that be a recommended way to switch over to tf.data without having to rewrite custom data processing. If not, what would be the recommended way?

I too think the documentation is very confusing at that point...

@Abhishaike
Copy link
Author

Abhishaike commented Jun 24, 2020

Hey @MaxSchambach, glad to hear that I'm not alone in this issue! Unfortunately, there doesn't seem to be a good way to move over to tf.Data without significant rewriting. I still do think this should be updated in the docs, so I'd like to reopen the issue.

However, if it's any consolation, there ARE ways to get around deadlocking using the current Sequence framework. It's very hacky and customized to our problem, but it works. I didn't write that particular fix, but the general overview is that you need a pretty good understanding of the multiprocessing library. You need to use the multiprocessing Lock() functionality to continually .acquire() or .release() whatever your data is (Spark files in our case) to ensure that the underlying Tensorflow threads don't try to grab onto multiple files at the same time, all which calling the garbage collector to immediately collect any stray data and prevent memory leaks. I don't think I'd really be able to share a code snippet of how it works, purely due to how specific to our problem it is.

Personally, I'd just move to Pytorch if your data loading setup is complex enough to have to use stuff like Sequences or tf.Data. Tensorflow doesn't seem well designed for unusual training schemes.

@hm2092
Copy link

hm2092 commented Jul 6, 2020

Hi @Abhishaike , @MaxSchambach. Thank you for your points and suggestions regarding the issue with Sequence dataloader. I came across this issue trying to overcome the same situation. Using tf.keras.utils.Sequence type dataloader with tf.keras.utils.OrderedEnqueuer seems to work for me in tf 2.0 while using a custom training loop (not tf.keras.Model.fit_generator method) with multiprocessing. According to the answer in this thread, the OrderedEnqueuer seems to ensure the order of data loading with multiple workers without having to dig much into multiprocessing. The tf.keras.Model.fit_generator method in tf 2.0 uses OrderedEnqueuer (as far as i understood from the source code) and there was no deadlock issue while training and playing around with number of workers and max_queue_size resulted in better performace. With tf.keras.Model.fit method in tf 2.0, i also run into deadlock. Seems like in tensorflow 2.2 this is fixed and works efficiently with tf.keras.Model.fit method. In all these cases, the dataloader was Sequence type and not tf.data type, as my data preprocessing involved some python specific dependencies. Still i guess it is better to use the tf.data pipeline if the data preprocessing can be written in tensorflow.

@innat
Copy link

innat commented Mar 6, 2021

Training just stopped at the very first epoch with this message:

TensorFlow 2.4.1

WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing 
nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.

It's really disappointing to get such an issue. Rewriting tf.keras.utils.Sequence to tf.data.Dataset is not a smooth transaction. The one which was the recommended way now is not safe to use! I wonder what we may see for tf.data! This issue must be opened and resolved. It's clear why people move to PyTorch, no offense but it makes totally sense.

Personally, I'd just move to Pytorch if your data loading setup is complex enough to have to use stuff like Sequences or tf.Data. Tensorflow doesn't seem well designed for unusual training schemes.

@Wazaki-Ou
Copy link

I am currently facing the same issue. I read somewhere that tf.keras.utils.Sequence is the proper way to implement a custom data generator, but when I try using multiprocessing and workers, I get that warning and training does not start. It's been almost a year since this issue was opened. Has there been any progress ??

@bzamecnik
Copy link
Contributor

bzamecnik commented Jun 30, 2021

It seems that TF Keras is sensitive to Sequence implementations not being thread-safe or process-safe. I've been having horrible problems migrating my data pipelines using generators/sequences to TF 2. But there are some observations and possible bugs in TF Keras. Assuming we use a Sequence with multiprocessing (and TF 2.3).

  • Sequence.__getitem__ is not accessed only from subprocesses but also from the main process!
    -__getitem__(0) is called from tensorflow.python.keras.engine.data_adapter.KerasSequenceAdapter._peek_and_restore(): return x[0], x
    • it peeks at the first batch to sniff the shape
    • it's called prior forking/spawning the subprocess
  • Sequence.on_epoch_end() is also called from the main process

Thus even if we lazily initialize some thread-unsafe state within the Sequence instance it gets initialized in the main process and then copied to the subprocess! Also modifying the state in the hook modifies only the instance in the main process not in the subprocess.

I tried to use the basic Keras MNIST convnet example, wrapped the arrays with a Sequence and trace the process ids: https://gist.github.com/bzamecnik/dcc1d1a39f3e4fa7ac5733d80b79fa2d (code + logs)

In general Keras supports Sequences and multiprocessing (at least in TF 2.3) but if there's anything thread-unsafe in the Sequence it fails.

@msokur
Copy link

msokur commented Sep 7, 2021

I have a hack
(I'm very not sure that this is the perfect decision, but it works and works without a lot of changes)

For example, you have a class inherited from keras.utils.Sequence

class DataGenerator(keras.utils.Sequence):
    def __init__(self):
        #Some initialization
    
    def __len__(self):
        return number_of_steps_in_epoche   #number of batches

    def __getitem__(self, index):
        data = load(index)
        X, y = data["X"], data["y"]
        return X, y

and code that uses it, for example:

data_generator = DataGenerator()

model.fit(x=data_generator)

---------------A hack---------------
As we can see method __getitem__ is private. Let's make its public copy and an attribute for the length in __init__ method. All would look like that:

class DataGenerator(keras.utils.Sequence):
    def __init__(self):
        #Some initialization
        self.len = self.__len__()    #an attribute for the length
    
    def __len__(self):
        return number_of_steps_in_epoche

    def __getitem__(self, index):
        data = load(index)
        X, y = data['X'], data['y']
        return X, y

    def getitem(self, index):
        return self.__getitem__(index)

And now the most interesting part:

  1. we create a simple generator inside which we call public getitem method of keras.utils.Sequence instance
  2. from this simple generator we create tf.data.Dataset object

It would look like this:

data_generator = DataGenerator()

def gen_data_generator():
    for i in range(data_generator.len):
        yield data_generator.getitem(i)    #edited regaring to @Inigo-13 comment

data_dataset =  tf.data.Dataset.from_generator(gen_data_generator, output_signature=(
        tf.TensorSpec(shape=(None, your_shapes), dtype=tf.float32),
        tf.TensorSpec(shape=(None, ), dtype=tf.float32)))  #according to tf.data.Dataset.from_generator documentation we have to specify output_signature

model.fit(x=data_dataset)

And one more advantage: it's easy to return to the previous implementation.

@Inigo-13
Copy link

Inigo-13 commented Sep 17, 2021

I have a hack
(I'm very not sure that this is the perfect decision, but it works and works without a lot of changes)

For example, you have a class inherited from keras.utils.Sequence

class DataGenerator(keras.utils.Sequence):
    def __init__(self):
        #Some initialization
    
    def __len__(self):
        return number_of_steps_in_epoche   #number of batches

    def __getitem__(self, index):
        data = load(index)
        X, y = data["X"], data["y"]
        return X, y

and code that uses it, for example:

data_generator = DataGenerator()

model.fit(x=data_generator)

---------------A hack---------------
As we can see method __getitem__ is private. Let's make its public copy and an attribute for the length in __init__ method. All would look like that:

class DataGenerator(keras.utils.Sequence):
    def __init__(self):
        #Some initialization
        self.len = self.__len__()    #an attribute for the length
    
    def __len__(self):
        return number_of_steps_in_epoche

    def __getitem__(self, index):
        data = load(index)
        X, y = data['X'], data['y']
        return X, y

    def getitem(self, index):
        return self.__getitem__(index)

And now the most interesting part:

  1. we create a simple generator inside which we call public getitem method of keras.utils.Sequence instance
  2. from this simple generator we create tf.data.Dataset object

It would look like this:

data_generator = DataGenerator()

def gen_data_generator():
    for i in range(data_generator.len):
        yield data_generator.getitem()

data_dataset =  tf.data.Dataset.from_generator(gen_data_generator, output_signature=(
        tf.TensorSpec(shape=(None, your_shapes), dtype=tf.float32),
        tf.TensorSpec(shape=(None, ), dtype=tf.float32)))  #according to tf.data.Dataset.from_generator documentation we have to specify output_signature

model.fit(x=data_dataset)

And one more advantage: it's easy to return to the previous implementation.

Hi!
A question related to the solution proposed here...what about the "index" parameter when calling the custom method
"getitem" from the Keras Sequence?

@msokur
Copy link

msokur commented Sep 20, 2021

Hi!
A question related to the solution proposed here...what about the "index" parameter when calling the custom method
"getitem" from the Keras Sequence?

You are right, thank you, I forgot to copy indexing!
It would look like this:

def gen_data_generator():
    for i in range(data_generator.len):
        yield data_generator.getitem(i)

Was: yield data_generator.getitem()
Has to be: yield data_generator.getitem(i)

I've also edited my first answer.

@pangyuteng
Copy link

pangyuteng commented Jul 9, 2022

Thank you @msokur @Inigo-13 !

--

For those that couldn't get the above impl to work, I was getting the below error:
"tensorflow/core/kernels/data/generator_dataset_op.cc:108] Error occurred when finalizing GeneratorDataset iterator: FAILED_PRECONDITION: Python interpreter state is not initialized. The process may be terminated."

Error resolved by initializing the sequence generator within the generator.

def gen_data_generator():
    data_generator = DataGenerator()
    for idx in range(len(data_generator)):
        yield data_generator[idx]

Was really hoping to see the GPU utility increase, but mine still fluctuates from 0 to 100% during training, likely due to usage of python methods as mentioned in this SO post or just pending further tweaking of the tf.dataset options ... regardless, this is one elegant implementation!

--
edit: GPU utility increased with below one liner optimization :) .

data_dataset = data_dataset.cache().batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE).unbatch()

@zoythum
Copy link

zoythum commented Oct 24, 2022

I have a hack (I'm very not sure that this is the perfect decision, but it works and works without a lot of changes)

For example, you have a class inherited from keras.utils.Sequence

class DataGenerator(keras.utils.Sequence):
    def __init__(self):
        #Some initialization
    
    def __len__(self):
        return number_of_steps_in_epoche   #number of batches

    def __getitem__(self, index):
        data = load(index)
        X, y = data["X"], data["y"]
        return X, y

and code that uses it, for example:

data_generator = DataGenerator()

model.fit(x=data_generator)

---------------A hack--------------- As we can see method __getitem__ is private. Let's make its public copy and an attribute for the length in __init__ method. All would look like that:

class DataGenerator(keras.utils.Sequence):
    def __init__(self):
        #Some initialization
        self.len = self.__len__()    #an attribute for the length
    
    def __len__(self):
        return number_of_steps_in_epoche

    def __getitem__(self, index):
        data = load(index)
        X, y = data['X'], data['y']
        return X, y

    def getitem(self, index):
        return self.__getitem__(index)

And now the most interesting part:

  1. we create a simple generator inside which we call public getitem method of keras.utils.Sequence instance
  2. from this simple generator we create tf.data.Dataset object

It would look like this:

data_generator = DataGenerator()

def gen_data_generator():
    for i in range(data_generator.len):
        yield data_generator.getitem(i)    #edited regaring to @Inigo-13 comment

data_dataset =  tf.data.Dataset.from_generator(gen_data_generator, output_signature=(
        tf.TensorSpec(shape=(None, your_shapes), dtype=tf.float32),
        tf.TensorSpec(shape=(None, ), dtype=tf.float32)))  #according to tf.data.Dataset.from_generator documentation we have to specify output_signature

model.fit(x=data_dataset)

And one more advantage: it's easy to return to the previous implementation.

My current implementation of get_item returns a batch of elements like this

def __data_generation(self, ids):

        x = np.empty((self.batch_size, *self.input_size, self.num_channels), dtype=int)
        y = np.empty((self.batch_size, *self.input_size, 1), dtype=int)

        # Generate data
        for i, ID in enumerate(ids):
            # Store sample
            # x[i, ] = np.array(self.__true_color_img(ID))

            if self.mode == 'train':
                image_dir = TRAIN_SOURCES + TRAIN_SOURCE_NAME + ID + '/image.npy'
            else:
                image_dir = TEST_SOURCES + TEST_SOURCE_NAME + ID + '/image.npy'
            curr = np.load(image_dir)
            x[i,] = curr.astype(int)
            # Store class
            if self.mode == 'train':
                y[i] = np.expand_dims(np.asarray(Image.open(TRAIN_LABELS + TRAIN_LABEL_NAME + ID + '/labels.tif')), -1)
            else:
                y[i] = np.expand_dims(np.asarray(Image.open(TEST_LABELS + TEST_LABEL_NAME + ID + '/labels.tif')), -1)

        return x, y

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        list_ids_current_batch = [self.list_ids[k] for k in indexes]

        # Generate data
        x, y = self.__data_generation(list_ids_current_batch)

        return x, y

Unfortunately this raises the following error TypeError: 'generator' yielded an element of shape (128, 512, 512, 3) where an element of shape (512, 512, 3) was expected. where 128 is the batch_size specified in the generator. Does anyone know how to fix this behaviour?

@gowthamkpr
Copy link

gowthamkpr commented Oct 24, 2022

@everyone If this is still an issue, Can you please create this issue in keras-team/keras as the development of keras has been moved to a different repo and the keras team is inactive here. I'm sorry for the premature closure but if someone creates an issue, I'll be happy to look into it. Thanks!!

@QuantHao
Copy link

QuantHao commented Dec 6, 2024

what a great post, and very inspiring to me.

Thanks a lot to all the suggestion made above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
comp:data tf.data related issues comp:keras Keras related issues stale This label marks the issue/pr stale - to be closed automatically if no activity stat:awaiting response Status - Awaiting response from author type:support Support issues
Projects
None yet
Development

No branches or pull requests