Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into edge
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasbardino committed May 29, 2024
2 parents ca10f18 + 82ea540 commit f525f9d
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 50 deletions.
89 changes: 50 additions & 39 deletions mig/shared/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --- BEGIN_HEADER ---
#
# fileio - wrappers to keep file I/O in a single replaceable module
# Copyright (C) 2003-2023 The MiG Project lead by Brian Vinter
# Copyright (C) 2003-2024 The MiG Project by the Science HPC Center at UCPH
#
# This file is part of MiG.
#
Expand Down Expand Up @@ -70,77 +70,88 @@
exit(1)


def write_chunk(path, chunk, offset, logger, mode='r+b'):
"""Wrapper to handle writing of chunks with offset to path.
Creates file first if it doesn't already exist.
def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
make_parent=True, create_file=True):
"""Internal helper to wrap writing of chunks with offset to path.
The optional make_parent and create_file are used to decide if the parent
directory and the file should be created if it doesn't already exist.
"""
if not logger:
logger = null_logger("dummy")
# logger.debug("writing chunk to %r at offset %d" % (path, offset))

if offset < 0:
logger.error("cannot write to negative offset %d in %r" %
(offset, path))
return False

# create dir and file if it does not exists

(head, _) = os.path.split(path)
if not os.path.isdir(head):
if not os.path.isdir(head) and make_parent:
try:
os.mkdir(head)
except Exception as err:
logger.error("could not create parent dir %r: %s" % (head, err))
if not os.path.isfile(path):
return False

# ensure a file is present

if not os.path.isfile(path) and create_file:
try:
open(path, "w").close()
except Exception as err:
logger.error("could not create file %r: %s" % (path, err))
return False

try:
filehandle = open(path, mode)
# Make sure we can write at requested position, filling if needed
try:
filehandle.seek(offset)
except:
filehandle.seek(0, 2)
file_size = filehandle.tell()
for _ in xrange(offset - file_size):
filehandle.write('\0')
# logger.debug("write %r chunk of size %d at position %d" %
# (path, len(chunk), filehandle.tell()))
filehandle.write(chunk)
filehandle.close()
# logger.debug("file %r chunk written at %d" % (path, offset))
return True
with open(path, mode) as filehandle:
if offset > 0:
# Make sure we can write at requested position, filling if needed
try:
filehandle.seek(offset)
except:
filehandle.seek(0, 2)
file_size = filehandle.tell()
for _ in xrange(offset - file_size):
filehandle.write('\0')
# logger.debug("write %r chunk of size %d at position %d" %
# (path, len(chunk), filehandle.tell()))
filehandle.write(chunk)
# logger.debug("file %r chunk written at %d" % (path, offset))
return True
except Exception as err:
logger.error("could not write %r chunk at %d: %s" %
(path, offset, err))
return False


def write_chunk(path, chunk, offset, logger, mode='r+b'):
"""Wrapper to handle writing of chunks with offset to path.
Creates file first if it doesn't already exist.
"""
return _write_chunk(path, chunk, offset, logger, mode)


def write_file(content, path, logger, mode='w', make_parent=True, umask=None):
"""Wrapper to handle writing of contents to path"""
if not logger:
logger = null_logger("dummy")
# logger.debug("writing %r file" % path)

# create dir if it does not exists

(head, _) = os.path.split(path)
if umask is not None:
old_umask = os.umask(umask)
if not os.path.isdir(head) and make_parent:
try:
# logger.debug("making parent directory %r" % head)
os.mkdir(head)
except Exception as err:
logger.error("could not create parent dir %r: %s" % (head, err))
try:
filehandle = open(path, mode)
filehandle.write(content)
filehandle.close()
# logger.debug("file %r written" % path)
retval = True
except Exception as err:
logger.error("could not write file %r: %s" % (path, err))
retval = False

# NOTE: detect byte writes and handle explicitly in a portable way
if isinstance(content, bytes) and 'b' not in mode:
mode = "%sb" % mode # appended to avoid mode ordering error on PY2

retval = _write_chunk(path, content, offset=0, logger=logger, mode=mode,
make_parent=make_parent, create_file=False)

if umask is not None:
os.umask(old_umask)

return retval


Expand Down
1 change: 1 addition & 0 deletions tests/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def cleanpath(relative_path, test_case):
assert isinstance(test_case, MigTestCase)
tmp_path = os.path.join(TEST_OUTPUT_DIR, relative_path)
test_case._cleanup_paths.add(tmp_path)
return tmp_path


def temppath(relative_path, test_case, skip_clean=False):
Expand Down
74 changes: 63 additions & 11 deletions tests/test_mig_shared_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,99 @@

import mig.shared.fileio as fileio

DUMMY_BYTES = binascii.unhexlify('DEADBEEF') # 4 bytes
DUMMY_BYTES = binascii.unhexlify('DEADBEEF') # 4 bytes
DUMMY_BYTES_LENGTH = 4
DUMMY_FILE_WRITECHUNK = 'fileio/write_chunk'
DUMMY_FILE_WRITEFILE = 'fileio/write_file'

assert isinstance(DUMMY_BYTES, bytes)

class TestFileioWriteChunk(MigTestCase):

class MigSharedFileio__write_chunk(MigTestCase):
def setUp(self):
super(TestFileioWriteChunk, self).setUp()
super(MigSharedFileio__write_chunk, self).setUp()
self.tmp_path = temppath(DUMMY_FILE_WRITECHUNK, self, skip_clean=True)
cleanpath(os.path.dirname(DUMMY_FILE_WRITECHUNK), self)

# TODO: enable next test again once the bug is fixed (see PR48)
#def test_write_chunk_error_on_invalid_data(self):
# did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger)
# self.assertFalse(did_succeed)
def test_return_false_on_invalid_data(self):
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_invalid_offset(self):
did_succeed = fileio.write_chunk(self.tmp_path, DUMMY_BYTES, -42,
self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_invalid_dir(self):
os.makedirs(self.tmp_path)

def test_write_chunk_creates_directory(self):
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger)
self.assertFalse(did_succeed)

def test_creates_directory(self):
fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger)

path_kind = self.assertPathExists(DUMMY_FILE_WRITECHUNK)
self.assertEqual(path_kind, "file")

def test_write_chunk_store_bytes(self):
def test_store_bytes(self):
fileio.write_chunk(self.tmp_path, DUMMY_BYTES, 0, self.logger)

with open(self.tmp_path, 'rb') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)

def test_write_chunk_store_bytes_at_offset(self):
def test_store_bytes_at_offset(self):
offset = 3

fileio.write_chunk(self.tmp_path, DUMMY_BYTES, offset, self.logger)

with open(self.tmp_path, 'rb') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_BYTES_LENGTH + offset)
self.assertEqual(content[0:3], bytearray([0, 0, 0]), "expected a hole was left")
self.assertEqual(content[0:3], bytearray([0, 0, 0]),
"expected a hole was left")
self.assertEqual(content[3:], DUMMY_BYTES)


class MigSharedFileio__write_file(MigTestCase):
def setUp(self):
super(MigSharedFileio__write_file, self).setUp()
self.tmp_path = temppath(DUMMY_FILE_WRITEFILE, self, skip_clean=True)
cleanpath(os.path.dirname(DUMMY_FILE_WRITEFILE), self)

def test_return_false_on_invalid_data(self):
did_succeed = fileio.write_file(1234, self.tmp_path, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_invalid_dir(self):
os.makedirs(self.tmp_path)

did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
self.assertFalse(did_succeed)

def test_return_false_on_missing_dir(self):
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger,
make_parent=False)
self.assertFalse(did_succeed)

def test_creates_directory(self):
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
self.assertTrue(did_succeed)

path_kind = self.assertPathExists(DUMMY_FILE_WRITEFILE)
self.assertEqual(path_kind, "file")

def test_store_bytes(self):
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
self.assertTrue(did_succeed)

with open(self.tmp_path, 'rb') as file:
content = file.read(1024)
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
self.assertEqual(content[:], DUMMY_BYTES)


if __name__ == '__main__':
testmain()

0 comments on commit f525f9d

Please sign in to comment.