Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serializer for keras models #878

Merged
merged 4 commits into from
Feb 22, 2017
Merged

Add serializer for keras models #878

merged 4 commits into from
Feb 22, 2017

Conversation

bnaul
Copy link
Contributor

@bnaul bnaul commented Feb 17, 2017

Fixes #841. Turned out to be easier than expected: model._updated_config() is what's used inside model.save() to extract the metadata, and get_weights() returns a list of numpy arrays of weight parameters.

Only weird bit I see is the repeated registrations: depending on how the model is instantiated, it could have any of those types (they all inherit from keras.engine.training.Model but it didn't seem to work if I didn't register all three; maybe there's a better way though?).

@mrocklin
Copy link
Member

Cool find. The only other thing to do here is to convert the numpy arrays into bytes-like objects. You can probably do this with something like the following:

weights = model.get_weights()
headers, frames = list(zip(*map(serialize, weights)))
header['headers'] = headers
header['lengths'] = [len(L) for L in frames]
frames = [frame for L in frames for frame in L]

On the deserialize side:

n = 0
weights = []
for head, length in zip(header['headers'], header['lengths']):
    x = deserialize(head, frames[n: n + length])
    weights.append(x)
    n += length

@mrocklin
Copy link
Member

The full chain test would probably be something like the following:

data = {'x': to_serialize(model)}
frames = dumps(data)
result = loads(frames)
... test against data['x'] and result['x']

Adding this test would reveal the issue referred to above.

@bnaul bnaul force-pushed the master branch 2 times, most recently from c187a50 to bf50a99 Compare February 17, 2017 22:40
header['headers'] = headers
header['lengths'] = [len(L) for L in frames]
frames = [frame for L in frames for frame in L]
return header, frames
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, the failure here is my fault. Dask is already using the term 'lengths' and so there is a conflict. Naming this something else should work. However I've also identified another internal issue. Resolving that now.

@bnaul bnaul force-pushed the master branch 2 times, most recently from 957d096 to 26159b3 Compare February 21, 2017 19:24
bnaul and others added 4 commits February 21, 2017 11:29
We reverse frame order when merging for efficiency (popping from the end
of a list is cheaper) however we don't want to do this if we take the
fast path.  Now we only flip after we've passed the fast path.
@mrocklin
Copy link
Member

This is in. Thanks @bnaul !

@anthonylobko
Copy link

I was just wondering what the right way to utilize keras with dask.distributed is. I'm getting errors no matter what I try. What I've tried include:

result = client.map(model.predict, data) result = client.submit(lambda x,y: x.predict(y), model, data)

but no matter what I try, I get Theano errors when I run that don't occur when I run it without dask.distributed, and seems to be related to not having the model working properly. Could it be something to do with it being serialized in some way?

My model is:
model = Sequential() model.add(Conv2D(filters=filters, kernel_size=(1, 5), padding='same', activation=None, input_shape=input_shape)) model.add(GaussianNoise(0.1)) model.add(Activation('relu')) model.add(MaxPooling2D(pool_size=(1, 2))) model.add(Dropout(0.2)) model.add(Conv2D(filters=filters, kernel_size=(4, 1), padding='same', activation='relu')) model.add(MaxPooling2D(pool_size=(4, 1))) model.add(Reshape((-1, len_seq))) model.add(Dropout(0.2)) model.add(Bidirectional(LSTM(n_lstm, return_sequences=True))) model.add(Bidirectional(LSTM(n_lstm))) model.add(Dropout(0.2)) model.add(Dense(2, activation='sigmoid'))
and the error I get is:

theano.gof.fg.MissingInputError: Long error message
Input 0 of the graph (indices start from 0), used to compute if{}(keras_learning_phase, Elemwise{add,no_inplace}.0, Elemwise{add,no_inplace}.0), was not provided and not given a value. Use the Theano flag exception_verbosity='high', for more information on this error.

This seem to only happen with dask.distributed, doesn't ever occur when running it with cpu natively.

@mrocklin @bnaul I know this might not be the right place to ask the question, but you two are the only link between Keras and Dask.distributed I've found on the entire Internet.

Thanks!

@mrocklin
Copy link
Member

mrocklin commented Apr 20, 2017 via email

@bnaul
Copy link
Contributor Author

bnaul commented Apr 21, 2017

Hi @amelius15,

It seems that there are still issues with keras and multiple threads, at least with the tensorflow backend (see e.g. keras-team/keras#5896 and keras-team/keras#5640). Someone in the latter thread mentioned that they were able to do inference in parallel by persisting the model structure to disk and re-loading it but I haven't confirmed that this works. In my own work I worked around the problem by using a process pool instead of thread pool; I'm not sure how you're launching the workers but you could try --nprocs <n> to see if that helps. If not, I'm afraid there isn't quite enough info here to be able to debug; maybe you could post a self-contained gist that recreates your error?

I've been meaning to spend some time exploring the different possible options here, curious to hear what else you find!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants