Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: send() method; 'retry' parameter didn't work as specify #21

Merged
merged 2 commits into from
Apr 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@
/xmodem.egg-info
/.coverage
/.tox
/.cache
/.idea
*.py.swp
98 changes: 86 additions & 12 deletions test/unit/test_xmodem.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
from io import BytesIO
except ImportError:
# python 2
import StringIO.StringIO as BytesIO
from StringIO import StringIO as BytesIO

# local
import xmodem
from xmodem import NAK
from xmodem import CRC
from xmodem import ACK
from xmodem import XMODEM

# 3rd-party
import pytest
Expand All @@ -24,24 +27,95 @@ def dummy_putc(data, timeout=1):
return 0


def test_xmodem_bad_mode():
# given,
mode = 'XXX'
modem = XMODEM(getc=dummy_getc, putc=dummy_putc, mode=mode)
# exercise
with pytest.raises(ValueError):
status = modem.send(BytesIO(b'dummy-stream'))


@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
def test_xmodem_dummy_fails_send(mode):
# given,
modem = xmodem.XMODEM(getc=dummy_getc,
putc=dummy_putc,
mode=mode)
modem = XMODEM(getc=dummy_getc, putc=dummy_putc, mode=mode)
# exercise
status = modem.send(BytesIO(b'dummy-stream'))
# verify
assert not status, ("Expected value of status `False'")


def test_xmodem_bad_mode():
@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
@pytest.mark.parametrize('stream_data', [BytesIO(b'dummy-stream ' * 17),
BytesIO(b'dummy-stream ' * 1000)])
def test_xmodem_send_exceed_maximum_number_of_resend(mode, stream_data):
""" XMODEM has retry parameter this function ensure that xmodem.send() will
return False dude to the fact that resend exceeded """
# given,
mode = 'XXX'
modem = xmodem.XMODEM(getc=dummy_getc,
putc=dummy_putc,
mode=mode)
max_resend = 16

def generator():
if mode == 'xmodem':
yield NAK
else:
yield CRC

if mode == 'xmodem':
yield ACK

for i in range(max_resend + 1):
yield None

while True:
yield ACK

mock = generator()

def mock_getc(size, timeout=1):
try:
# python 2
x = mock.next()
except AttributeError:
# python 3
x = next(mock)
return x

xmodem = XMODEM(getc=mock_getc, putc=dummy_putc, mode=mode)
# exercise
with pytest.raises(ValueError):
status = modem.send(BytesIO(b'dummy-stream'))
assert not xmodem.send(stream=stream_data, retry=max_resend)


@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
@pytest.mark.parametrize('stream_data', [BytesIO(b'dummy-stream ' * 17),
BytesIO(b'dummy-stream ' * 1000)])
def test_xmodem_send_fails_once_each_packet(mode, stream_data):
""" XMODEM has retry parameter this test ensure that the number of retry for
the whole stream_data will be higher the max_resend pro packet """
# given,
max_resend = 16

def generator():
if mode == 'xmodem':
yield NAK
else:
yield CRC

while True:
yield None
yield ACK

mock = generator()

def mock_getc(size, timeout=1):
try:
# python 2
x = mock.next()
except AttributeError:
# python 3
x = next(mock)
return x

xmodem = XMODEM(getc=mock_getc, putc=dummy_putc, mode=mode)
# exercise
assert xmodem.send(stream=stream_data, retry=max_resend)
5 changes: 3 additions & 2 deletions xmodem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def send(self, stream, retry=16, timeout=60, quiet=False, callback=None):
'''
Send a stream via the XMODEM protocol.

>>> stream = file('/etc/issue', 'rb')
>>> stream = open('/etc/issue', 'rb')
>>> print(modem.send(stream))
True

Expand Down Expand Up @@ -308,6 +308,7 @@ def callback(total_packets, success_count, error_count)
success_count += 1
if callable(callback):
callback(total_packets, success_count, error_count)
error_count = 0
break

self.log.error('send error: expected ACK; got %r for block %d',
Expand Down Expand Up @@ -370,7 +371,7 @@ def recv(self, stream, crc_mode=1, retry=16, timeout=60, delay=1, quiet=0):
'''
Receive a stream via the XMODEM protocol.

>>> stream = file('/etc/issue', 'wb')
>>> stream = open('/etc/issue', 'wb')
>>> print(modem.recv(stream))
2342

Expand Down