Skip to content

Commit

Permalink
Fix issue where StopIteration exception was being raised when using n…
Browse files Browse the repository at this point in the history
…ext instead of readilne.

This only manifested itself when using a real stdin talking to Storm. I vaguely
recall running into this problem when using next() instead of readline with
stdin in the past, even though it works fine with other files.  Anyway, since
this was really just for testing, I've just made it so we use readline and the
tests now use io.StringIO objects for stdin instead of lists.
  • Loading branch information
dan-blanchard committed Apr 27, 2015
1 parent 9392417 commit 78a5cd9
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 42 deletions.
16 changes: 6 additions & 10 deletions streamparse/storm/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import sys
from collections import deque, namedtuple
from logging.handlers import RotatingFileHandler
from threading import RLock
from traceback import format_exc

Expand Down Expand Up @@ -120,8 +121,8 @@ class Component(object):
logging messages back to the Storm worker process.
:ivar input_stream: The ``iterable`` to use to retrieve commands from Storm.
Defaults to ``sys.stdin``.
:ivar input_stream: The ``file``-like object to use to retrieve commands
from Storm. Defaults to ``sys.stdin``.
:ivar output_stream: The ``file``-like object to send messages to Storm with.
Defaults to ``sys.stdout``.
:ivar topology_name: The name of the topology sent by Storm in the initial
Expand Down Expand Up @@ -195,7 +196,6 @@ def _setup_component(self, storm_conf, context):
self.context = context
self.logger = logging.getLogger('.'.join((__name__,
self.component_name)))

# Set up logging
log_path = self.storm_conf.get('streamparse.log.path')
if log_path:
Expand All @@ -211,9 +211,8 @@ def _setup_component(self, storm_conf, context):
component_name=self.component_name,
task_id=self.task_id,
pid=self.pid))
handler = logging.handlers.RotatingFileHandler(log_file,
maxBytes=max_bytes,
backupCount=backup_count)
handler = RotatingFileHandler(log_file, maxBytes=max_bytes,
backupCount=backup_count)
formatter = logging.Formatter('%(asctime)s - %(name)s - '
'%(levelname)s - %(message)s')
handler.setFormatter(formatter)
Expand All @@ -230,7 +229,6 @@ def _setup_component(self, storm_conf, context):
'msg': ('WARNING: streamparse logging is not '
'configured. Please set streamparse.log.'
'path in your config.json.')})

# Redirect stdout to ensure that print statements/functions
# won't disrupt the multilang protocol
sys.stdout = LogStream(logging.getLogger('streamparse.stdout'))
Expand Down Expand Up @@ -265,9 +263,7 @@ def read_message(self):
# readline will return trailing \n so that output is unambigious, we
# should only have line == '' if we're at EOF
with self._reader_lock:
# Use next instead of readline to support reading from lists
line = next(self.input_stream)

line = self.input_stream.readline()
if line == 'end\n':
break
elif line == '':
Expand Down
51 changes: 34 additions & 17 deletions test/streamparse/test_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging
import time
import unittest
from io import BytesIO
from io import BytesIO, StringIO

try:
from unittest import mock
Expand Down Expand Up @@ -38,7 +38,7 @@ def setUp(self):
self.tup = Tuple(self.tup_dict['id'], self.tup_dict['comp'],
self.tup_dict['stream'], self.tup_dict['task'],
self.tup_dict['tuple'],)
self.bolt = Bolt(input_stream=itertools.cycle(tup_json.splitlines(True)),
self.bolt = Bolt(input_stream=StringIO(tup_json),
output_stream=BytesIO())
self.bolt.initialize({}, {})

Expand Down Expand Up @@ -102,13 +102,14 @@ def test_run(self, ack_mock, process_mock):

@patch.object(Bolt, 'process', autospec=True)
@patch.object(Bolt, 'ack', autospec=True)
def test_auto_ack(self, ack_mock, process_mock):
def test_auto_ack_on(self, ack_mock, process_mock):
# test auto-ack on (the default)
self.bolt._run()
ack_mock.assert_called_with(self.bolt, self.tup)
ack_mock.reset_mock()

# test auto-ack off
@patch.object(Bolt, 'process', autospec=True)
@patch.object(Bolt, 'ack', autospec=True)
def test_auto_ack_off(self, ack_mock, process_mock):
self.bolt.auto_ack = False
self.bolt._run()
# Assert that this wasn't called, and print out what it was called with
Expand Down Expand Up @@ -146,7 +147,7 @@ def test_auto_anchor(self, send_message_mock):
@patch.object(Bolt, 'raise_exception', new=lambda *a: None)
@patch.object(Bolt, 'fail', autospec=True)
@patch.object(Bolt, '_run', autospec=True)
def test_auto_fail(self, _run_mock, fail_mock):
def test_auto_fail_on(self, _run_mock, fail_mock):
self.bolt._current_tups = [self.tup]
# Make sure _run raises an exception
def raiser(): # lambdas can't raise
Expand All @@ -156,7 +157,18 @@ def raiser(): # lambdas can't raise
# test auto-fail on (the default)
self.bolt.run()
fail_mock.assert_called_with(self.bolt, self.tup)
fail_mock.reset_mock()

@patch('sys.exit', new=lambda r: r)
@patch.object(Bolt, 'read_handshake', new=lambda x: ({}, {}))
@patch.object(Bolt, 'raise_exception', new=lambda *a: None)
@patch.object(Bolt, 'fail', autospec=True)
@patch.object(Bolt, '_run', autospec=True)
def test_auto_fail_off(self, _run_mock, fail_mock):
self.bolt._current_tups = [self.tup]
# Make sure _run raises an exception
def raiser(): # lambdas can't raise
raise Exception('borkt')
_run_mock.side_effect = raiser

# test auto-fail off
self.bolt.auto_fail = False
Expand Down Expand Up @@ -214,9 +226,8 @@ def setUp(self):
self.tups = [Tuple(tup_dict['id'], tup_dict['comp'], tup_dict['stream'],
tup_dict['task'], tup_dict['tuple']) for tup_dict in
self.tup_dicts]
self.bolt = BatchingBolt(
input_stream=itertools.cycle(tups_json.splitlines(True)),
output_stream=BytesIO())
self.bolt = BatchingBolt(input_stream=StringIO(tups_json),
output_stream=BytesIO())
self.bolt.initialize({}, {})

def tearDown(self):
Expand Down Expand Up @@ -259,7 +270,7 @@ def test_exception_handling(self):

@patch.object(BatchingBolt, 'ack', autospec=True)
@patch.object(BatchingBolt, 'process_batch', new=lambda *args: None)
def test_auto_ack(self, ack_mock):
def test_auto_ack_on(self, ack_mock):
# Test auto-ack on (the default)
for __ in range(3):
self.bolt._run()
Expand All @@ -268,8 +279,10 @@ def test_auto_ack(self, ack_mock):
mock.call(self.bolt, self.tups[1]),
mock.call(self.bolt, self.tups[2])],
any_order=True)
ack_mock.reset_mock()

@patch.object(BatchingBolt, 'ack', autospec=True)
@patch.object(BatchingBolt, 'process_batch', new=lambda *args: None)
def test_auto_ack_off(self, ack_mock):
# Test auto-ack off
self.bolt.auto_ack = False
for __ in range(3):
Expand All @@ -279,9 +292,10 @@ def test_auto_ack(self, ack_mock):
# otherwise.
self.assertListEqual(ack_mock.call_args_list, [])


@patch.object(BatchingBolt, '_handle_worker_exception', autospec=True)
@patch.object(BatchingBolt, 'fail', autospec=True)
def test_auto_fail(self, fail_mock, worker_exception_mock):
def test_auto_fail_on(self, fail_mock, worker_exception_mock):
# Need to re-register signal handler with mocked version, because
# mock gets created after handler was originally registered.
self.setUp()
Expand All @@ -296,18 +310,21 @@ def test_auto_fail(self, fail_mock, worker_exception_mock):
mock.call(self.bolt, self.tups[2])],
any_order=True)
self.assertEqual(worker_exception_mock.call_count, 1)
fail_mock.reset_mock()
worker_exception_mock.reset_mock()

# Test auto-fail off
@patch.object(BatchingBolt, '_handle_worker_exception', autospec=True)
@patch.object(BatchingBolt, 'fail', autospec=True)
def test_auto_fail_off(self, fail_mock, worker_exception_mock):
# Need to re-register signal handler with mocked version, because
# mock gets created after handler was originally registered.
self.setUp()
self.bolt.auto_fail = False
for __ in range(3):
self.bolt._run()
time.sleep(0.5)
# Assert that this wasn't called, and print out what it was called with
# otherwise.
self.assertListEqual(fail_mock.call_args_list, [])
self.assertListEqual(worker_exception_mock.call_args_list, [])
self.assertEqual(worker_exception_mock.call_count, 1)

@patch.object(BatchingBolt, '_handle_worker_exception', autospec=True)
@patch.object(BatchingBolt, 'process_batch', autospec=True)
Expand Down
67 changes: 53 additions & 14 deletions test/streamparse/test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

from __future__ import absolute_import, print_function, unicode_literals

import itertools
import json
import logging
import os
import unittest
from io import BytesIO
from io import BytesIO, StringIO

try:
from unittest import mock
Expand Down Expand Up @@ -70,7 +69,7 @@ def test_read_handshake(self):
expected_context = handshake_dict['context']
inputs = ["{}\n".format(json.dumps(handshake_dict)),
"end\n"]
component = Component(input_stream=iter(inputs),
component = Component(input_stream=StringIO(''.join(inputs)),
output_stream=BytesIO())
given_conf, given_context = component.read_handshake()
pid_path = os.path.join(pid_dir, str(component.pid))
Expand All @@ -81,8 +80,50 @@ def test_read_handshake(self):
self.assertEqual("{}\nend\n".format(json.dumps({"pid": component.pid})).encode('utf-8'),
component.output_stream.getvalue())

@patch('sys.exit', autospec=True)
def test_read_message(self, exit_mock):
def test_setup_component(self):
conf = {"topology.message.timeout.secs": 3,
"topology.tick.tuple.freq.secs": 1,
"topology.debug": True,
"topology.name": "foo"}
context = {
"task->component": {
"1": "example-spout",
"2": "__acker",
"3": "example-bolt1",
"4": "example-bolt2"
},
"taskid": 3,
# Everything below this line is only available in Storm 0.11.0+
"componentid": "example-bolt",
"stream->target->grouping": {
"default": {
"example-bolt2": {
"type": "SHUFFLE"
}
}
},
"streams": ["default"],
"stream->outputfields": {"default": ["word"]},
"source->stream->grouping": {
"example-spout": {
"default": {
"type": "FIELDS",
"fields": ["word"]
}
}
}
}
component = Component(input_stream=BytesIO(),
output_stream=BytesIO())
component._setup_component(conf, context)
self.assertEqual(component.topology_name, conf['topology.name'])
self.assertEqual(component.task_id, context['taskid'])
self.assertEqual(component.component_name,
context['task->component'][str(context['taskid'])])
self.assertEqual(component.storm_conf, conf)
self.assertEqual(component.context, context)

def test_read_message(self):
inputs = [# Task IDs
'[12, 22, 24]\n', 'end\n',
# Incoming tuple for bolt
Expand All @@ -92,21 +133,19 @@ def test_read_message(self, exit_mock):
# next command for spout
'{"command": "next"}\n', 'end\n',
# empty message, which should trigger sys.exit (end ignored)
'', 'end\n']
'', '']
outputs = [json.loads(msg) for msg in inputs[::2] if msg]
outputs.append('')
component = Component(input_stream=iter(inputs),
component = Component(input_stream=StringIO(''.join(inputs)),
output_stream=BytesIO())
for output in outputs:
log.info('Checking msg for %s', output)
if output:
msg = component.read_message()
self.assertEqual(output, msg)
else:
with self.assertRaises(ValueError):
with self.assertRaises(SystemExit):
component.read_message()
self.assertEqual(exit_mock.call_count, 1)
exit_mock.reset_mock()

def test_read_split_message(self):
# Make sure we can read something that's broken up into many "lines"
Expand All @@ -118,7 +157,7 @@ def test_read_split_message(self):
'end\n']
output = json.loads(''.join(inputs[:-1]))

component = Component(input_stream=iter(inputs),
component = Component(input_stream=StringIO(''.join(inputs)),
output_stream=BytesIO())
msg = component.read_message()
self.assertEqual(output, msg)
Expand All @@ -134,7 +173,7 @@ def test_read_command(self):
# next command for spout
'{"command": "next"}\n', 'end\n']
outputs = [json.loads(msg) for msg in inputs[::2]]
component = Component(input_stream=iter(inputs),
component = Component(input_stream=StringIO(''.join(inputs)),
output_stream=BytesIO())

# Skip first output, because it's a task ID, and won't be returned by
Expand All @@ -158,7 +197,7 @@ def test_read_task_ids(self):
# Task IDs
'[16, 23, 42]\n', 'end\n']
outputs = [json.loads(msg) for msg in inputs[::2]]
component = Component(input_stream=iter(inputs),
component = Component(input_stream=StringIO(''.join(inputs)),
output_stream=BytesIO())

# Skip middle outputs, because they're commands and won't be returned by
Expand Down Expand Up @@ -193,7 +232,7 @@ def test_read_tuple(self):
del output['tuple']
outputs.append(Tuple(**output))

component = Component(input_stream=iter(inputs),
component = Component(input_stream=StringIO(''.join(inputs)),
output_stream=BytesIO())

for output in outputs:
Expand Down
2 changes: 1 addition & 1 deletion test/streamparse/test_spout.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def setUp(self):
self.tup = Tuple(self.tup_dict['id'], self.tup_dict['comp'],
self.tup_dict['stream'], self.tup_dict['task'],
self.tup_dict['tuple'],)
self.spout = Spout(input_stream=itertools.cycle(["next\nend\n"]),
self.spout = Spout(input_stream=BytesIO(),
output_stream=BytesIO())
self.spout.initialize({}, {})
self.spout.logger = log
Expand Down

0 comments on commit 78a5cd9

Please sign in to comment.