Skip to content

Commit cc6aaf8

Browse files
committed
Merge pull request #5 from minrk/limiter
use session.deserialize to unpack message for rate limiting
2 parents 9e2c95d + c280b77 commit cc6aaf8

File tree

2 files changed

+17
-9
lines changed

2 files changed

+17
-9
lines changed

notebook/base/zmqhandlers.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,16 +218,23 @@ def send_error(self, *args, **kwargs):
218218
self.stream.close()
219219

220220

221-
def _reserialize_reply(self, msg_list, channel=None):
221+
def _reserialize_reply(self, msg_or_list, channel=None):
222222
"""Reserialize a reply message using JSON.
223223
224-
This takes the msg list from the ZMQ socket, deserializes it using
225-
self.session and then serializes the result using JSON. This method
226-
should be used by self._on_zmq_reply to build messages that can
224+
msg_or_list can be an already-deserialized msg dict or the zmq buffer list.
225+
If it is the zmq list, it will be deserialized with self.session.
226+
227+
This takes the msg list from the ZMQ socket and serializes the result for the websocket.
228+
This method should be used by self._on_zmq_reply to build messages that can
227229
be sent back to the browser.
230+
228231
"""
229-
idents, msg_list = self.session.feed_identities(msg_list)
230-
msg = self.session.deserialize(msg_list)
232+
if isinstance(msg_or_list, dict):
233+
# already unpacked
234+
msg = msg_or_list
235+
else:
236+
idents, msg_list = self.session.feed_identities(msg_or_list)
237+
msg = self.session.deserialize(msg_list)
231238
if channel:
232239
msg['channel'] = channel
233240
if msg['buffers']:

notebook/services/kernels/handlers.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,10 @@ def on_message(self, msg):
269269

270270
def _on_zmq_reply(self, stream, msg_list):
271271
idents, fed_msg_list = self.session.feed_identities(msg_list)
272+
msg = self.session.deserialize(fed_msg_list)
273+
parent = msg['parent_header']
272274
def write_stderr(error_message):
273275
self.log.warn(error_message)
274-
parent = json.loads(fed_msg_list[2])
275276
msg = self.session.msg("stream",
276277
content={"text": error_message, "name": "stderr"},
277278
parent=parent
@@ -280,7 +281,7 @@ def write_stderr(error_message):
280281
self.write_message(json.dumps(msg, default=date_default))
281282

282283
channel = getattr(stream, 'channel', None)
283-
msg_type = json.loads(fed_msg_list[1])['msg_type']
284+
msg_type = msg['header']['msg_type']
284285
if channel == 'iopub' and msg_type not in {'status', 'comm_open', 'execute_input'}:
285286

286287
# Remove the counts queued for removal.
@@ -345,7 +346,7 @@ def write_stderr(error_message):
345346
# If either of the limit flags are set, do not send the message.
346347
if self._iopub_msgs_exceeded or self._iopub_data_exceeded:
347348
return
348-
super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg_list)
349+
super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg)
349350

350351

351352
def on_close(self):

0 commit comments

Comments
 (0)