diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..fab67bc
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,13 @@
+# https://editorconfig.org/
+root = true
+
+[*]
+indent_style = space
+indent_size = 4
+insert_final_newline = true
+trim_trailing_whitespace = true
+end_of_line = lf
+charset = utf-8
+
+[*.py]
+max_line_length = 100
\ No newline at end of file
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000..2158008
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,4 @@
+[pytest]
+# uncomment to ensure all logs are captured and displayed at debug level, even when successful.
+#log_cli = true
+#log_cli_level = DEBUG
diff --git a/test/functional/accessories.py b/test/functional/accessories.py
index 0079469..adfd700 100644
--- a/test/functional/accessories.py
+++ b/test/functional/accessories.py
@@ -11,27 +11,11 @@ def _multi_which(prog_names):
return None
-def _get_recv_program():
- bin_path = _multi_which(('rb', 'lrb'))
- assert bin_path is not None, (
- "program required: {0!r}. "
- "Try installing lrzsz package.".format(bin_path))
- return bin_path
-
-
-def _get_send_program():
- bin_path = _multi_which(('sb', 'lsb'))
- assert bin_path is not None, (
- "program required: {0!r}. "
- "Try installing lrzsz package.".format(bin_path))
- return bin_path
-
-recv_prog = _get_recv_program()
-send_prog = _get_send_program()
+recv_prog = _multi_which(('rb', 'lrb'))
+send_prog = _multi_which(('sb', 'lsb'))
CHUNKSIZE = 521
-
def fill_binary_data(stream):
for byte in range(0x00, 0xff + 1, 10):
stream.write(bytearray([byte] * CHUNKSIZE))
diff --git a/test/functional/test.py b/test/functional/test.py
deleted file mode 100644
index 4930c1d..0000000
--- a/test/functional/test.py
+++ /dev/null
@@ -1,80 +0,0 @@
-from __future__ import print_function
-import threading
-import time
-import sys
-from xmodem import XMODEM
-try:
- from Queue import Empty, Queue
- from StringIO import StringIO
-except ImportError:
- from queue import Empty, Queue
- from io import StringIO
-
-class FakeIO(object):
- streams = [Queue(), Queue()]
- stdin = []
- stdot = []
- delay = 0.01 # simulate modem delays
-
- def putc(self, data, q=0):
- for char in data:
- self.streams[1-q].put(char)
- print('p%d(0x%x)' % (q, ord(char)), end='')
- sys.stdout.flush()
- return len(data)
-
- def getc(self, size, q=0):
- data = []
- while size:
- try:
- char = self.streams[q].get(timeout=0.5) # Wait at most 1/2 second for data in the queue
- print('r%d(0x%x)' % (q, ord(char)), end='')
- sys.stdout.flush()
- data.append(char)
- size -= 1
- except Empty:
- return None
- return ''.join(data)
-
-class Client(threading.Thread):
- def __init__(self, io, server, filename):
- threading.Thread.__init__(self)
- self.io = io
- self.server = server
- self.stream = open(filename, 'rb')
-
- def getc(self, data, timeout=0):
- return self.io.getc(data, 0)
-
- def putc(self, data, timeout=0):
- return self.io.putc(data, 0)
-
- def run(self):
- self.xmodem = XMODEM(self.getc, self.putc)
- print('%s %s' % ('c.send', self.xmodem.send(self.stream)))
-
-class Server(FakeIO, threading.Thread):
- def __init__(self, io):
- threading.Thread.__init__(self)
- self.io = io
- self.stream = StringIO()
-
- def getc(self, data, timeout=0):
- return self.io.getc(data, 1)
-
- def putc(self, data, timeout=0):
- return self.io.putc(data, 1)
-
- def run(self):
- self.xmodem = XMODEM(self.getc, self.putc)
- print('%s %s' % ('s.recv', file=self.xmodem.recv(self.stream)))
- print('got')
- print(self.stream.getvalue())
-
-if __name__ == '__main__':
- i = FakeIO()
- s = Server(i)
- c = Client(i, s, sys.argv[1])
- s.start()
- c.start()
-
diff --git a/test/functional/test_xmodem.py b/test/functional/test_xmodem_using_lrzsz.py
similarity index 83%
rename from test/functional/test_xmodem.py
rename to test/functional/test_xmodem_using_lrzsz.py
index 7bd8de6..98f0e5d 100644
--- a/test/functional/test_xmodem.py
+++ b/test/functional/test_xmodem_using_lrzsz.py
@@ -7,6 +7,9 @@
import tempfile
import functools
import subprocess
+
+import pytest
+
try:
# python 3
from io import BytesIO
@@ -23,30 +26,28 @@
verify_binary_data,
)
-logging.basicConfig(format='%(levelname)-5s %(message)s',
- level=logging.DEBUG)
-
+MISSING_LRB_MSG = "'rb' or 'lrb' required. Try installing the 'lrzsz' package"
+MISSING_LSB_MSG = "'sb' or 'lsb' required. Try installing the 'lrzsz' package"
def _proc_getc(size, timeout=1, proc=None):
- # our getc function simply pipes to the standard out of the `rb'
- # or `lrb' program -- any data written by such program is returned
+ # our getc function pipes to the standard out of the `rb'
+ # or `lrb' program -- any data written by 'rb' is returned
# by our getc() callback.
assert proc.returncode is None, ("{0} has exited: (returncode={1})"
.format(proc, proc.returncode))
- logging.debug(('get', size))
+ logging.debug('_proc_getc: read (size=%s, timeout=%s)', size, timeout)
ready_read, _, _ = select.select([proc.stdout], [], [], timeout)
if proc.stdout not in ready_read:
assert False, ("Timeout on stdout of {0}.".format(proc))
data = proc.stdout.read(size)
- logging.debug(('got', len(data), data))
+ logging.debug('_proc_getc: read %s bytes: %r', len(data), data)
return data
def _proc_putc(data, timeout=1, proc=None):
- # similarly, our putc function simply writes to the standard of
- # our `rb' or `lrb' program -- any data written by our XMODEM
- # protocol via putc() callback is written to the stdin of such
- # program.
+ # similarly, the putc function pipes to standard in of the 'rb'
+ # or `lrb' program -- any data written by our XMODEM
+ # protocol via putc() callback is written to the stdin 'rb'.
assert proc.returncode is None, ("{0} has exited: (returncode={1})"
.format(proc, proc.returncode))
_, ready_write, _ = select.select([], [proc.stdin], [], timeout)
@@ -59,15 +60,17 @@ def _proc_putc(data, timeout=1, proc=None):
def _send_callback(total_packets, success_count, error_count):
- # this simple callback simply asserts that no errors have occurred, and
+ # this callback asserts that no errors have occurred, and
# prints the given status to stderr. This is captured but displayed in
# py.test output only on error.
assert error_count == 0
assert success_count == total_packets
- print('{0}'.format(total_packets), file=sys.stderr)
+ logging.debug('_send_callback: total_packets=%s, success_count=%s, error_count=%s',
+ total_packets, success_count, error_count)
-def test_xmodem_send():
+@pytest.mark.skipif(recv_prog is None, reason=MISSING_LRB_MSG)
+def test_xmodem_send_with_lrb():
""" Using external program for receive, verify XMODEM.send(). """
# Given,
_, recv_filename = tempfile.mkstemp()
@@ -97,7 +100,8 @@ def test_xmodem_send():
os.unlink(recv_filename)
-def test_xmodem_recv():
+@pytest.mark.skipif(send_prog is None, reason=MISSING_LSB_MSG)
+def test_xmodem_recv_with_lsb():
""" Using external program for send, verify XMODEM.recv(). """
# Given,
_, send_filename = tempfile.mkstemp()
@@ -127,7 +131,8 @@ def test_xmodem_recv():
os.unlink(send_filename)
-def test_xmodem1k_send():
+@pytest.mark.skipif(recv_prog is None, reason=MISSING_LRB_MSG)
+def test_xmodem1k_send_with_lrb():
""" Using external program for receive, verify XMODEM1k.send(). """
# Given,
_, recv_filename = tempfile.mkstemp()
@@ -156,7 +161,8 @@ def test_xmodem1k_send():
os.unlink(recv_filename)
-def test_xmodem1k_recv():
+@pytest.mark.skipif(send_prog is None, reason=MISSING_LSB_MSG)
+def test_xmodem1k_recv_with_lsb():
""" Using external program for send, verify XMODEM1k.recv(). """
# Given,
_, send_filename = tempfile.mkstemp()
@@ -187,7 +193,8 @@ def test_xmodem1k_recv():
os.unlink(send_filename)
-def test_xmodem_send_16bit_crc():
+@pytest.mark.skipif(recv_prog is None, reason=MISSING_LRB_MSG)
+def test_xmodem_send_16bit_crc_with_lrb():
"""
Using external program for receive, verify XMODEM.send() with 16-bit CRC.
"""
@@ -219,7 +226,7 @@ def test_xmodem_send_16bit_crc():
os.unlink(recv_filename)
-def test_xmodem_recv_oldstyle_checksum():
+def test_xmodem_recv_oldstyle_checksum_with_lrb():
"""
Using external program for send, verify XMODEM.recv() with crc_mode 0.
"""
diff --git a/test/unit/test_xmodem.py b/test/unit/test_xmodem.py
index d2ce874..cc681ba 100644
--- a/test/unit/test_xmodem.py
+++ b/test/unit/test_xmodem.py
@@ -8,9 +8,11 @@
except ImportError:
# python 2
from StringIO import StringIO as BytesIO
+import time
+import logging
# local
-from xmodem import NAK, CRC, ACK, XMODEM, STX, SOH, EOT
+from xmodem import NAK, CRC, ACK, XMODEM, STX, SOH, EOT, CAN
# 3rd-party
import pytest
@@ -44,13 +46,12 @@ def test_xmodem_dummy_fails_send(mode):
@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
-@pytest.mark.parametrize('stream_data', [BytesIO(b'dummy-stream ' * 17),
- BytesIO(b'dummy-stream ' * 1000)])
-def test_xmodem_send_exceed_maximum_number_of_resend(mode, stream_data):
- """ Verify send(retry=n) after 'n' transfer failures of single block. """
+def test_xmodem_fails_when_send_exceed_maximum_number_of_resend(mode):
+ """ Verify send(retry=n) fails after 'n' transfer failures of single block. """
# given,
max_resend = 3
+ stream_data = BytesIO(b'dummy-stream')
def getc_generator():
if mode == 'xmodem':
@@ -73,20 +74,122 @@ def getc_generator():
def mock_getc(size, timeout=1):
return next(mock)
+ progress_records = []
+ def callback(total_packets, success_count, error_count):
+ record = {'total_packets': total_packets, 'success_count': success_count,
+ 'error_count': error_count}
+ logging.debug('callback: %r', record)
+ progress_records.append(record)
xmodem = XMODEM(getc=mock_getc, putc=dummy_putc, mode=mode)
# exercise
- result = xmodem.send(stream=stream_data, retry=max_resend)
+ result = xmodem.send(stream=stream_data, retry=max_resend, callback=callback)
# verify
assert not result
+ if mode == 'xmodem':
+ assert progress_records == [
+ {'error_count': 0, 'success_count': 1, 'total_packets': 1},
+ {'error_count': 1, 'success_count': 1, 'total_packets': 2},
+ {'error_count': 2, 'success_count': 1, 'total_packets': 2},
+ {'error_count': 3, 'success_count': 1, 'total_packets': 2},
+ {'error_count': 4, 'success_count': 1, 'total_packets': 2}]
+
+ elif mode == 'xmodem1k':
+ assert progress_records == [
+ {'total_packets': 1, 'success_count': 0, 'error_count': 1},
+ {'total_packets': 1, 'success_count': 0, 'error_count': 2},
+ {'total_packets': 1, 'success_count': 0, 'error_count': 3},
+ {'total_packets': 1, 'success_count': 0, 'error_count': 4}
+ ]
+
+@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
+def test_xmodem_send_cancelled_by_can_can(mode):
+ """ Verify send() is cancelled when CAN CAN is received at start-sequence. """
+
+ # given,
+ def getc_generator():
+ yield CAN
+ yield CAN
+
+ mock = getc_generator()
+
+ def mock_getc(size, timeout=1):
+ return next(mock)
+
+ xmodem = XMODEM(getc=mock_getc, putc=dummy_putc, mode=mode)
+
+ # exercise
+ result = xmodem.send(stream=BytesIO())
+
+ # verify failure to send
+ assert not result
+
+@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
+def test_xmodem_send_cancelled_by_eot(mode):
+ """ Verify send() is cancelled when EOT is received at start-sequence. """
+
+ # given,
+ def getc_generator():
+ yield EOT
+
+ mock = getc_generator()
+
+ def mock_getc(size, timeout=1):
+ return next(mock)
+
+ xmodem = XMODEM(getc=mock_getc, putc=dummy_putc, mode=mode)
+
+ # exercise
+ result = xmodem.send(stream=BytesIO())
+
+ # verify failure to send
+ assert not result
+
+@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
+def test_xmodem_send_fails_by_garbage_start_sequence(mode, monkeypatch, caplog):
+ """ Verify send() fails when garbage bytes are received and number of retries are exceeded. """
+
+ monkeypatch.setattr(time, 'sleep', lambda t: None)
+
+ # given the same number of 'garbage' bytes as retry,
+ retry = 4
+ num_garbage_bytes = retry + 1
+ def getc_generator():
+ for n in range(num_garbage_bytes):
+ yield b'\xde'
+
+ mock = getc_generator()
+
+ def mock_getc(size, timeout=1):
+ return next(mock)
+
+ xmodem = XMODEM(getc=mock_getc, putc=dummy_putc, mode=mode)
+
+ # exercise
+ result_stream = BytesIO(b'123')
+ result = xmodem.send(stream=result_stream, retry=retry)
+
+ # verify failure to send
+ assert not result
+ error_logged_send_error = [rec for rec in caplog.records
+ if rec.message == "send error: expected NAK, CRC, EOT or CAN; got b'\\xde'"
+ and rec.levelno == logging.ERROR]
+ assert len(error_logged_send_error) == retry + 1
+
+ error_logged_aborts = [rec for rec in caplog.records
+ if rec.message == "send error: error_count reached {}, aborting.".format(retry)
+ and rec.levelno == logging.ERROR]
+ assert len(error_logged_aborts) == 1
+ # verify no data is sent, sending stream is never read, position in file does not advance
+ assert result_stream.tell() == 0
@pytest.mark.parametrize('mode', ['xmodem', 'xmodem1k'])
@pytest.mark.parametrize('stream_data', [BytesIO(b'dummy-stream ' * 17),
BytesIO(b'dummy-stream ' * 1000)])
-def test_xmodem_send_fails_once_each_packet(mode, stream_data):
- """ Verify send(retry=n) under 'n' transfer failures of single block. """
+def test_xmodem_send_succeeds_when_timeout_every_other_packet(mode, stream_data):
+ """ Verify send(retry=n) succeeds when every other ACK times out."""
# given,
max_resend = 1
@@ -118,11 +221,19 @@ def mock_getc(size, timeout=1):
assert result
-def test_xmodem1k_receive_fails_after_first_packet():
+def test_xmodem1k_receive_successful_when_timeout_after_first_packet(monkeypatch):
""" Verify recv reaction to timeout directly after first packet """
# given,
max_resend = 1
- mode='xmodem1k'
+ mode = 'xmodem1k'
+ monkeypatch.setattr(time, 'sleep', lambda t: None)
+
+ progress_records = []
+ def callback(total_packets, success_count, error_count, packet_size):
+ record = {'total_packets': total_packets, 'success_count': success_count,
+ 'error_count': error_count, 'packet_size': packet_size}
+ logging.debug('callback: %r', record)
+ progress_records.append(record)
def getc_generator():
yield STX
@@ -155,7 +266,15 @@ def mock_getc(size, timeout=1):
# exercise
destination = BytesIO()
- result = xmodem.recv(stream=destination, retry=max_resend)
+ result = xmodem.recv(stream=destination, retry=max_resend, callback=callback)
# verify
assert result
+
+ assert len(progress_records) == 4
+ assert progress_records == [
+ {'total_packets': 1, 'success_count': 1, 'error_count': 0, 'packet_size': 1024},
+ {'total_packets': 1, 'success_count': 1, 'error_count': 1, 'packet_size': 1024},
+ {'total_packets': 2, 'success_count': 2, 'error_count': 0, 'packet_size': 1024},
+ {'total_packets': 2, 'success_count': 2, 'error_count': 0, 'packet_size': 1024},
+ ]
diff --git a/tox.ini b/tox.ini
index 37ca48f..9d23496 100644
--- a/tox.ini
+++ b/tox.ini
@@ -3,6 +3,13 @@ envlist = py26,
py27,
py33,
py34,
+ py35,
+ py36,
+ py37,
+ py38,
+ py39,
+ py310,
+ py311,
pypy
skip_missing_interpreters = true
diff --git a/xmodem/__init__.py b/xmodem/__init__.py
index 4be695f..03d1a44 100644
--- a/xmodem/__init__.py
+++ b/xmodem/__init__.py
@@ -116,6 +116,7 @@
import platform
import logging
+import select
import time
import sys
from functools import partial
@@ -136,7 +137,7 @@ class XMODEM(object):
XMODEM Protocol handler, expects two callables which encapsulate the read
and write operations on the underlying stream.
- Example functions for reading and writing to a serial line:
+ Example functions for reading and writing to a serial line using `pyserial `_::
>>> import serial
>>> from xmodem import XMODEM
@@ -296,7 +297,7 @@ def callback(total_packets, success_count, error_count)
error_count += 1
if error_count > retry:
- self.log.info('send error: error_count reached %d, '
+ self.log.error('send error: error_count reached %d, '
'aborting.', retry)
self.abort(timeout=timeout)
return False
@@ -359,6 +360,8 @@ def callback(total_packets, success_count, error_count)
else:
self.log.error('send error: expected ACK; got %r', char)
error_count += 1
+ if callback:
+ callback(total_packets, success_count, error_count)
if error_count > retry:
self.log.warning('EOT was not ACKd, aborting transfer')
self.abort(timeout=timeout)
@@ -400,7 +403,7 @@ def recv(self, stream, crc_mode=1, retry=16, timeout=60, delay=1, quiet=0, callb
:param stream: The stream object to write data to.
:type stream: stream (file, etc.)
- :param crc_mode: XMODEM CRC mode
+ :param crc_mode: XMODEM CRC mode, 0 is standard checksum, 1 is 16-bit checksum.
:type crc_mode: int
:param retry: The maximum number of times to try to resend a failed
packet before failing.
@@ -413,40 +416,39 @@ def recv(self, stream, crc_mode=1, retry=16, timeout=60, delay=1, quiet=0, callb
:param quiet: If ``True``, write transfer information to stderr.
:type quiet: bool
:param callback: Reference to a callback function that has the
- following signature. This is useful for
- getting status updates while a xmodem
- transfer is underway. Packet size can only be
- determined once the transfer started, so it also
- is delivered in the callback as the fourth parameter.
- Expected callback signature:
- def callback(total_packets, success_count, error_count, packet_size)
- :type callback: callable
+ following signature::
+
+ def callback(total_packets: int, success_count: int, error_count: int, packet_size: int)
+ This is useful for tracking progress state while an xmodem transfer is
+ underway. As packet size may be negotiated, it also included as the final
+ argument of the callback. note that the value of error_count resets to 0
+ after any successful block transfer.
+ :type callback: callable
'''
# initiate protocol
error_count = 0
- char = 0
cancel = 0
empty = 0
while True:
# first try CRC mode, if this fails,
# fall back to checksum mode
if error_count >= retry:
- self.log.info('error_count reached %d, aborting.', retry)
+ self.log.error('error_count reached %d, aborting.', retry)
self.abort(timeout=timeout)
return None
elif crc_mode and error_count < (retry // 2):
if not self.putc(CRC):
- self.log.debug('recv error: putc failed, '
- 'sleeping for %d', delay)
+ self.log.warning('recv error: putc failed, '
+ 'sleeping for %d', delay)
time.sleep(delay)
error_count += 1
else:
crc_mode = 0
if not self.putc(NAK):
- self.log.debug('recv error: putc failed, '
- 'sleeping for %d', delay)
+ self.log.warning('recv error: putc failed, '
+ 'sleeping for %d', delay)
time.sleep(delay)
error_count += 1
@@ -474,16 +476,19 @@ def callback(total_packets, success_count, error_count, packet_size)
self.log.info('transmission canceled: file empty')
self.putc(CAN)
self.putc(CAN)
+ # 'purge' any remaining data on the line
while True:
if self.getc(1, timeout=1) is None:
break
time.sleep(.001) # better cpu usage
return 0
else:
- self.log.info('first eot received ')
+ self.log.debug('first eot received ')
empty = 1
else:
error_count += 1
+ if callable(callback):
+ callback(0, 0, error_count, 128)
# read data
error_count = 0
@@ -530,6 +535,8 @@ def callback(total_packets, success_count, error_count, packet_size)
print(err_msg, file=sys.stderr)
self.log.warning(err_msg)
error_count += 1
+ if callable(callback):
+ callback(total_packets, success_count, error_count, packet_size)
if error_count > retry:
self.log.info('error_count reached %d, aborting.',
retry)
@@ -601,6 +608,8 @@ def callback(total_packets, success_count, error_count, packet_size)
if data is None:
break
error_count += 1
+ if callable(callback):
+ callback(total_packets, success_count, error_count, packet_size)
self.putc(NAK)
# get next start-of-header byte
char = self.getc(1, timeout)
@@ -681,7 +690,7 @@ def _send(mode='xmodem', filename=None, timeout=30):
def _getc(size, timeout=timeout):
read_ready, _, _ = select.select([so], [], [], timeout)
if read_ready:
- data = stream.read(size)
+ data = si.read(size)
else:
data = None
return data
@@ -703,7 +712,6 @@ def _putc(data, timeout=timeout):
def run():
'''Run the main entry point for sending and receiving files.'''
import argparse
- import serial
import sys
platform = sys.platform.lower()
@@ -714,16 +722,16 @@ def run():
default_port = '/dev/ttyS0'
parser = argparse.ArgumentParser()
- parser.add_argument('-p', '--port', default=default_port,
- help='serial port')
- parser.add_argument('-r', '--rate', default=9600, type=int,
- help='baud rate')
- parser.add_argument('-b', '--bytesize', default=serial.EIGHTBITS,
- help='serial port transfer byte size')
- parser.add_argument('-P', '--parity', default=serial.PARITY_NONE,
- help='serial port parity')
- parser.add_argument('-S', '--stopbits', default=serial.STOPBITS_ONE,
- help='serial port stop bits')
+ # parser.add_argument('-p', '--port', default=default_port,
+ # help='serial port')
+ # parser.add_argument('-r', '--rate', default=9600, type=int,
+ # help='baud rate')
+ # parser.add_argument('-b', '--bytesize', default=serial.EIGHTBITS,
+ # help='serial port transfer byte size')
+ # parser.add_argument('-P', '--parity', default=serial.PARITY_NONE,
+ # help='serial port parity')
+ # parser.add_argument('-S', '--stopbits', default=serial.STOPBITS_ONE,
+ # help='serial port stop bits')
parser.add_argument('-m', '--mode', default='xmodem',
help='XMODEM mode (xmodem, xmodem1k)')
parser.add_argument('-t', '--timeout', default=30, type=int,
@@ -741,74 +749,8 @@ def run():
if options.subcommand == 'send':
return _send(options.mode, options.filename, options.timeout)
- elif options.subcommand == 'recv':
- return _recv(options.mode, options.filename, options.timeout)
-
-
-def runx():
- import optparse
- import subprocess
-
- parser = optparse.OptionParser(
- usage='%prog [] filename filename')
- parser.add_option('-m', '--mode', default='xmodem',
- help='XMODEM mode (xmodem, xmodem1k)')
-
- options, args = parser.parse_args()
- if len(args) != 3:
- parser.error('invalid arguments')
- return 1
-
- elif args[0] not in ('send', 'recv'):
- parser.error('invalid mode')
- return 1
-
- def _func(so, si):
- import select
-
- def getc(size, timeout=3):
- read_ready, _, _ = select.select([so], [], [], timeout)
- if read_ready:
- data = so.read(size)
- else:
- data = None
-
- return data
-
- def putc(data, timeout=3):
- _, write_ready, _ = select.select([], [si], [], timeout)
- if write_ready:
- si.write(data)
- si.flush()
- size = len(data)
- else:
- size = None
-
- return size
-
- return getc, putc
-
- def _pipe(*command):
- pipe = subprocess.Popen(command,
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE)
- return pipe.stdout, pipe.stdin
-
- if args[0] == 'recv':
- getc, putc = _func(*_pipe('sz', '--xmodem', args[2]))
- stream = open(args[1], 'wb')
- xmodem = XMODEM(getc, putc, mode=options.mode)
- status = xmodem.recv(stream, retry=8)
- assert status, ('Transfer failed, status is', False)
- stream.close()
-
- elif args[0] == 'send':
- getc, putc = _func(*_pipe('rz', '--xmodem', args[2]))
- stream = open(args[1], 'rb')
- xmodem = XMODEM(getc, putc, mode=options.mode)
- sent = xmodem.send(stream, retry=8)
- assert sent is not None, ('Transfer failed, sent is', sent)
- stream.close()
+ # elif options.subcommand == 'recv':
+ # return _recv(options.mode, options.filename, options.timeout)
if __name__ == '__main__':
sys.exit(run())