From e437da8e3ff7f758942d30bd616e1044b8b7da51 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sun, 19 Nov 2023 10:12:10 +0100 Subject: [PATCH] Applied updates and worked on Python bindings --- .github/workflows/build.yml | 18 - .github/workflows/build_freebsd.yml | 21 + appveyor.yml | 2 +- configure.ac | 2 +- fsntfstools/info_handle.c | 6 +- include/libfsntfs.h.in | 9 + libfsntfs.nuspec | 4 +- manuals/libfsntfs.3 | 4 +- pyfsntfs/pyfsntfs_attribute.c | 130 ++++ pyfsntfs/pyfsntfs_attribute.h | 8 + setup.cfg.in | 1 + tests/Makefile.am | 2 + tests/pyfsntfs_test_attribute.py | 314 +++++++++ tests/pyfsntfs_test_file_entry.py | 955 ++++++++++++++++++++++++++++ tests/pyfsntfs_test_volume.py | 195 ++++-- tests/runtests.py | 5 +- tests/test_python_module.sh | 2 +- 17 files changed, 1580 insertions(+), 98 deletions(-) create mode 100644 .github/workflows/build_freebsd.yml create mode 100644 tests/pyfsntfs_test_attribute.py create mode 100644 tests/pyfsntfs_test_file_entry.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b3a296cd..1dc024be 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -3,24 +3,6 @@ name: build on: [push, pull_request] permissions: read-all jobs: - build_freebsd: - # FreeBSD support is provided via virtualization on MacOS 12 - # See https://github.com/vmactions/freebsd-vm#under-the-hood. - runs-on: macos-12 - steps: - - uses: actions/checkout@v3 - - name: Building from source - id: build_freebsd - uses: vmactions/freebsd-vm@v0 - with: - usesh: true - mem: 4096 - # Note that the test scripts require bash - prepare: | - pkg install -y autoconf automake bash fusefs-libs gettext git libtool openssl pkgconf - run: | - tests/build.sh - tests/runtests.sh build_ubuntu: runs-on: ubuntu-22.04 strategy: diff --git a/.github/workflows/build_freebsd.yml b/.github/workflows/build_freebsd.yml new file mode 100644 index 00000000..d23d500e --- /dev/null +++ b/.github/workflows/build_freebsd.yml @@ -0,0 +1,21 @@ +# Build from source on FreeBSD. +name: build_freebsd +on: [push] +permissions: read-all +jobs: + build_freebsd: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + - name: Building from source + id: build_freebsd + uses: vmactions/freebsd-vm@v1 + with: + usesh: true + mem: 4096 + # Note that the test scripts require bash + prepare: | + pkg install -y autoconf automake bash fusefs-libs gettext git libtool openssl pkgconf + run: | + tests/build.sh + tests/runtests.sh diff --git a/appveyor.yml b/appveyor.yml index c6c552be..dbd69905 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -252,7 +252,7 @@ install: .\builddokan.ps1 -Configuration ${Configuration} -Platform "x64"; } } - sh: if test ${BUILD_ENVIRONMENT} = "python-tox" || test ${BUILD_ENVIRONMENT} = "xcode"; then brew update-reset && brew update -q; fi -- sh: if test ${BUILD_ENVIRONMENT} = "python-tox" || test ${BUILD_ENVIRONMENT} = "xcode"; then brew install -q autoconf automake gettext gnu-sed libtool openssl osxfuse pkg-config || true; fi +- sh: if test ${BUILD_ENVIRONMENT} = "python-tox" || test ${BUILD_ENVIRONMENT} = "xcode"; then brew install -q autoconf automake gettext gnu-sed libtool macfuse openssl pkg-config || true; fi - sh: if test ${BUILD_ENVIRONMENT} = "python-tox"; then brew install -q python@${PYTHON_VERSION} tox twine-pypi || true; fi - cmd: if [%BUILD_ENVIRONMENT%]==[python] ( "%PYTHON%" -m pip install -U pip setuptools twine wheel ) diff --git a/configure.ac b/configure.ac index 1536c0e7..117fd224 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ AC_PREREQ([2.71]) AC_INIT( [libfsntfs], - [20231104], + [20231119], [joachim.metz@gmail.com]) AC_CONFIG_SRCDIR( diff --git a/fsntfstools/info_handle.c b/fsntfstools/info_handle.c index f9b75736..80479f0f 100644 --- a/fsntfstools/info_handle.c +++ b/fsntfstools/info_handle.c @@ -2082,7 +2082,7 @@ int info_handle_attribute_fprint( } fprintf( info_handle->notify_stream, - "\tType\t\t\t\t: %s (0x%08" PRIx32 ")\n", + "\tAttribute type\t\t\t: %s (0x%08" PRIx32 ")\n", info_handle_get_attribute_type_description( attribute_type ), attribute_type ); @@ -2160,7 +2160,7 @@ int info_handle_attribute_fprint( } fprintf( info_handle->notify_stream, - "\tName\t\t\t: " ); + "\tAttribute name\t\t: " ); if( info_handle_name_value_fprint( info_handle, @@ -2172,7 +2172,7 @@ int info_handle_attribute_fprint( error, LIBCERROR_ERROR_DOMAIN_RUNTIME, LIBCERROR_RUNTIME_ERROR_PRINT_FAILED, - "%s: unable to print name string.", + "%s: unable to print attribute name string.", function ); goto on_error; diff --git a/include/libfsntfs.h.in b/include/libfsntfs.h.in index 5db0fdc5..ffaa563b 100644 --- a/include/libfsntfs.h.in +++ b/include/libfsntfs.h.in @@ -1138,6 +1138,15 @@ int libfsntfs_attribute_get_data_size( size64_t *data_size, libfsntfs_error_t **error ); +/* Retrieves the valid data size + * Returns 1 if successful or -1 on error + */ +LIBFSNTFS_EXTERN \ +int libfsntfs_attribute_get_valid_data_size( + libfsntfs_attribute_t *attribute, + size64_t *valid_data_size, + libfsntfs_error_t **error ); + /* ------------------------------------------------------------------------- * $ATTRIBUTE_LIST attribute functions * ------------------------------------------------------------------------- */ diff --git a/libfsntfs.nuspec b/libfsntfs.nuspec index 189d176c..2a267b91 100644 --- a/libfsntfs.nuspec +++ b/libfsntfs.nuspec @@ -2,7 +2,7 @@ libfsntfs - 20231104 + 20231119 Joachim Metz joachimmetz LGPL-3.0-or-later @@ -10,7 +10,7 @@ false libfsntfs Library to access the Windows New Technology File System (NTFS) format - Release of libfsntfs 20231104 + Release of libfsntfs 20231119 Copyright (C) 2010-2023 native diff --git a/manuals/libfsntfs.3 b/manuals/libfsntfs.3 index 9931e8d2..687c8fc8 100644 --- a/manuals/libfsntfs.3 +++ b/manuals/libfsntfs.3 @@ -1,4 +1,4 @@ -.Dd July 28, 2022 +.Dd November 19, 2023 .Dt libfsntfs 3 .Os libfsntfs .Sh NAME @@ -244,6 +244,8 @@ Attribute functions .Fn libfsntfs_attribute_get_data_vcn_range "libfsntfs_attribute_t *attribute" "uint64_t *data_first_vcn" "uint64_t *data_last_vcn" "libfsntfs_error_t **error" .Ft int .Fn libfsntfs_attribute_get_data_size "libfsntfs_attribute_t *attribute" "size64_t *data_size" "libfsntfs_error_t **error" +.Ft int +.Fn libfsntfs_attribute_get_valid_data_size "libfsntfs_attribute_t *attribute" "size64_t *valid_data_size" "libfsntfs_error_t **error" .Pp $ATTRIBUTE_LIST attribute functions .Ft int diff --git a/pyfsntfs/pyfsntfs_attribute.c b/pyfsntfs/pyfsntfs_attribute.c index 71897f33..b52b5216 100644 --- a/pyfsntfs/pyfsntfs_attribute.c +++ b/pyfsntfs/pyfsntfs_attribute.c @@ -52,6 +52,20 @@ PyMethodDef pyfsntfs_attribute_object_methods[] = { "\n" "Returns the name of the attribute." }, + { "get_data_size", + (PyCFunction) pyfsntfs_attribute_get_data_size, + METH_NOARGS, + "get_data_size() -> Integer\n" + "\n" + "Returns the size of the attribute data." }, + + { "get_valid_data_size", + (PyCFunction) pyfsntfs_attribute_get_valid_data_size, + METH_NOARGS, + "get_valid_data_size() -> Integer\n" + "\n" + "Returns the size of the attribute data that is used (considered valid)." }, + /* Sentinel */ { NULL, NULL, 0, NULL } }; @@ -70,6 +84,18 @@ PyGetSetDef pyfsntfs_attribute_object_get_set_definitions[] = { "The name of the attribute.", NULL }, + { "data_size", + (getter) pyfsntfs_attribute_get_data_size, + (setter) 0, + "The size of the attribute data.", + NULL }, + + { "valid_data_size", + (getter) pyfsntfs_attribute_get_valid_data_size, + (setter) 0, + "The size of the attribute data that is used (considered valid).", + NULL }, + /* Sentinel */ { NULL, NULL, NULL, NULL, NULL } }; @@ -488,3 +514,107 @@ PyObject *pyfsntfs_attribute_get_name( return( NULL ); } +/* Retrieves the data size + * Returns a Python object if successful or NULL on error + */ +PyObject *pyfsntfs_attribute_get_data_size( + pyfsntfs_attribute_t *pyfsntfs_attribute, + PyObject *arguments PYFSNTFS_ATTRIBUTE_UNUSED ) +{ + libcerror_error_t *error = NULL; + PyObject *integer_object = NULL; + static char *function = "pyfsntfs_attribute_get_data_size"; + size64_t data_size = 0; + int result = 0; + + PYFSNTFS_UNREFERENCED_PARAMETER( arguments ) + + if( pyfsntfs_attribute == NULL ) + { + PyErr_Format( + PyExc_ValueError, + "%s: invalid attribute.", + function ); + + return( NULL ); + } + Py_BEGIN_ALLOW_THREADS + + result = libfsntfs_attribute_get_data_size( + pyfsntfs_attribute->attribute, + &data_size, + &error ); + + Py_END_ALLOW_THREADS + + if( result != 1 ) + { + pyfsntfs_error_raise( + error, + PyExc_IOError, + "%s: unable to retrieve data size.", + function ); + + libcerror_error_free( + &error ); + + return( NULL ); + } + integer_object = pyfsntfs_integer_unsigned_new_from_64bit( + (uint64_t) data_size ); + + return( integer_object ); +} + +/* Retrieves the valid data size + * Returns a Python object if successful or NULL on error + */ +PyObject *pyfsntfs_attribute_get_valid_data_size( + pyfsntfs_attribute_t *pyfsntfs_attribute, + PyObject *arguments PYFSNTFS_ATTRIBUTE_UNUSED ) +{ + libcerror_error_t *error = NULL; + PyObject *integer_object = NULL; + static char *function = "pyfsntfs_attribute_get_valid_data_size"; + size64_t valid_data_size = 0; + int result = 0; + + PYFSNTFS_UNREFERENCED_PARAMETER( arguments ) + + if( pyfsntfs_attribute == NULL ) + { + PyErr_Format( + PyExc_ValueError, + "%s: invalid attribute.", + function ); + + return( NULL ); + } + Py_BEGIN_ALLOW_THREADS + + result = libfsntfs_attribute_get_valid_data_size( + pyfsntfs_attribute->attribute, + &valid_data_size, + &error ); + + Py_END_ALLOW_THREADS + + if( result != 1 ) + { + pyfsntfs_error_raise( + error, + PyExc_IOError, + "%s: unable to retrieve valid data size.", + function ); + + libcerror_error_free( + &error ); + + return( NULL ); + } + integer_object = pyfsntfs_integer_unsigned_new_from_64bit( + (uint64_t) valid_data_size ); + + return( integer_object ); +} + diff --git a/pyfsntfs/pyfsntfs_attribute.h b/pyfsntfs/pyfsntfs_attribute.h index 68b669ff..5e474820 100644 --- a/pyfsntfs/pyfsntfs_attribute.h +++ b/pyfsntfs/pyfsntfs_attribute.h @@ -72,6 +72,14 @@ PyObject *pyfsntfs_attribute_get_name( pyfsntfs_attribute_t *pyfsntfs_attribute, PyObject *arguments ); +PyObject *pyfsntfs_attribute_get_data_size( + pyfsntfs_attribute_t *pyfsntfs_attribute, + PyObject *arguments ); + +PyObject *pyfsntfs_attribute_get_valid_data_size( + pyfsntfs_attribute_t *pyfsntfs_attribute, + PyObject *arguments ); + #if defined( __cplusplus ) } #endif diff --git a/setup.cfg.in b/setup.cfg.in index bffe5cd8..a5e413b2 100644 --- a/setup.cfg.in +++ b/setup.cfg.in @@ -3,6 +3,7 @@ name = libfsntfs-python version = @VERSION@ description = Python bindings module for libfsntfs long_description = Python bindings module for libfsntfs +long_description_content_type = text/plain author = Joachim Metz author_email = joachim.metz@gmail.com license = GNU Lesser General Public License v3 or later (LGPLv3+) diff --git a/tests/Makefile.am b/tests/Makefile.am index a9026e02..273a3c2a 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -37,6 +37,8 @@ TESTS = \ $(TESTS_PYFSNTFS) check_SCRIPTS = \ + pyfsntfs_test_attribute.py \ + pyfsntfs_test_file_entry.py \ pyfsntfs_test_support.py \ pyfsntfs_test_volume.py \ test_fsntfsinfo.sh \ diff --git a/tests/pyfsntfs_test_attribute.py b/tests/pyfsntfs_test_attribute.py new file mode 100644 index 00000000..92dfa04d --- /dev/null +++ b/tests/pyfsntfs_test_attribute.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python +# +# Python-bindings attribute type test script +# +# Copyright (C) 2010-2023, Joachim Metz +# +# Refer to AUTHORS for acknowledgements. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +import argparse +import os +import sys +import unittest + +import pyfsntfs + + +class DataRangeFileObject(object): + """File-like object that maps an in-file data range.""" + + def __init__(self, path, range_offset, range_size): + """Initializes a file-like object. + + Args: + path (str): path of the file that contains the data range. + range_offset (int): offset where the data range starts. + range_size (int): size of the data range starts, or None to indicate + the range should continue to the end of the parent file-like object. + """ + if range_size is None: + stat_object = os.stat(path) + range_size = stat_object.st_size + + super(DataRangeFileObject, self).__init__() + self._current_offset = 0 + self._file_object = open(path, "rb") + self._range_offset = range_offset + self._range_size = range_size + + def __enter__(self): + """Enters a with statement.""" + return self + + def __exit__(self, unused_type, unused_value, unused_traceback): + """Exits a with statement.""" + return + + def close(self): + """Closes the file-like object.""" + if self._file_object: + self._file_object.close() + self._file_object = None + + def get_offset(self): + """Retrieves the current offset into the file-like object. + + Returns: + int: current offset in the data range. + """ + return self._current_offset + + def get_size(self): + """Retrieves the size of the file-like object. + + Returns: + int: size of the data range. + """ + return self._range_size + + def read(self, size=None): + """Reads a byte string from the file-like object at the current offset. + + The function will read a byte string of the specified size or + all of the remaining data if no size was specified. + + Args: + size (Optional[int]): number of bytes to read, where None is all + remaining data. + + Returns: + bytes: data read. + + Raises: + IOError: if the read failed. + """ + if (self._range_offset < 0 or + (self._range_size is not None and self._range_size < 0)): + raise IOError("Invalid data range.") + + if self._current_offset < 0: + raise IOError( + "Invalid current offset: {0:d} value less than zero.".format( + self._current_offset)) + + if (self._range_size is not None and + self._current_offset >= self._range_size): + return b"" + + if size is None: + size = self._range_size + if self._range_size is not None and self._current_offset + size > self._range_size: + size = self._range_size - self._current_offset + + self._file_object.seek( + self._range_offset + self._current_offset, os.SEEK_SET) + + data = self._file_object.read(size) + + self._current_offset += len(data) + + return data + + def seek(self, offset, whence=os.SEEK_SET): + """Seeks to an offset within the file-like object. + + Args: + offset (int): offset to seek to. + whence (Optional(int)): value that indicates whether offset is an absolute + or relative position within the file. + + Raises: + IOError: if the seek failed. + """ + if self._current_offset < 0: + raise IOError( + "Invalid current offset: {0:d} value less than zero.".format( + self._current_offset)) + + if whence == os.SEEK_CUR: + offset += self._current_offset + elif whence == os.SEEK_END: + offset += self._range_size + elif whence != os.SEEK_SET: + raise IOError("Unsupported whence.") + if offset < 0: + raise IOError("Invalid offset value less than zero.") + + self._current_offset = offset + + +class AttributeTypeTests(unittest.TestCase): + """Tests the attribute type.""" + + def test_get_attribute_type(self): + """Tests the get_attribute_type function and attribute_type property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + if not fsntfs_file_entry.number_of_attributes: + raise unittest.SkipTest('missing attributes') + + fsntfs_attribute = fsntfs_file_entry.get_attribute(0) + self.assertIsNotNone(fsntfs_attribute) + + attribute_type = fsntfs_attribute.get_attribute_type() + self.assertIsNotNone(attribute_type) + + self.assertIsNotNone(fsntfs_attribute.attribute_type) + + finally: + fsntfs_volume.close() + + def test_get_attribute_name(self): + """Tests the get_attribute_name function and attribute_name property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + if not fsntfs_file_entry.number_of_attributes: + raise unittest.SkipTest('missing attributes') + + fsntfs_attribute = fsntfs_file_entry.get_attribute(0) + self.assertIsNotNone(fsntfs_attribute) + + attribute_name = fsntfs_attribute.get_attribute_name() + self.assertIsNone(attribute_name) + + self.assertIsNone(fsntfs_attribute.attribute_name) + + finally: + fsntfs_volume.close() + + def test_get_data_size(self): + """Tests the get_data_size function and data_size property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + if not fsntfs_file_entry.number_of_attributes: + raise unittest.SkipTest('missing attributes') + + fsntfs_attribute = fsntfs_file_entry.get_attribute(0) + self.assertIsNotNone(fsntfs_attribute) + + data_size = fsntfs_attribute.get_data_size() + self.assertIsNotNone(data_size) + + self.assertIsNotNone(fsntfs_attribute.data_size) + + finally: + fsntfs_volume.close() + + def test_get_valid_data_size(self): + """Tests the get_valid_data_size function and valid_data_size property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + if not fsntfs_file_entry.number_of_attributes: + raise unittest.SkipTest('missing attributes') + + fsntfs_attribute = fsntfs_file_entry.get_attribute(0) + self.assertIsNotNone(fsntfs_attribute) + + valid_data_size = fsntfs_attribute.get_valid_data_size() + self.assertIsNotNone(valid_data_size) + + self.assertIsNotNone(fsntfs_attribute.valid_data_size) + + finally: + fsntfs_volume.close() + + +if __name__ == "__main__": + argument_parser = argparse.ArgumentParser() + + argument_parser.add_argument( + "-o", "--offset", dest="offset", action="store", default=None, + type=int, help="offset of the source file.") + + argument_parser.add_argument( + "source", nargs="?", action="store", metavar="PATH", + default=None, help="path of the source file.") + + options, unknown_options = argument_parser.parse_known_args() + unknown_options.insert(0, sys.argv[0]) + + setattr(unittest, "offset", options.offset) + setattr(unittest, "source", options.source) + + unittest.main(argv=unknown_options, verbosity=2) diff --git a/tests/pyfsntfs_test_file_entry.py b/tests/pyfsntfs_test_file_entry.py new file mode 100644 index 00000000..173cb44d --- /dev/null +++ b/tests/pyfsntfs_test_file_entry.py @@ -0,0 +1,955 @@ +#!/usr/bin/env python +# +# Python-bindings file_entry type test script +# +# Copyright (C) 2010-2023, Joachim Metz +# +# Refer to AUTHORS for acknowledgements. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +import argparse +import os +import random +import sys +import unittest + +import pyfsntfs + + +class DataRangeFileObject(object): + """File-like object that maps an in-file data range.""" + + def __init__(self, path, range_offset, range_size): + """Initializes a file-like object. + + Args: + path (str): path of the file that contains the data range. + range_offset (int): offset where the data range starts. + range_size (int): size of the data range starts, or None to indicate + the range should continue to the end of the parent file-like object. + """ + if range_size is None: + stat_object = os.stat(path) + range_size = stat_object.st_size + + super(DataRangeFileObject, self).__init__() + self._current_offset = 0 + self._file_object = open(path, "rb") + self._range_offset = range_offset + self._range_size = range_size + + def __enter__(self): + """Enters a with statement.""" + return self + + def __exit__(self, unused_type, unused_value, unused_traceback): + """Exits a with statement.""" + return + + def close(self): + """Closes the file-like object.""" + if self._file_object: + self._file_object.close() + self._file_object = None + + def get_offset(self): + """Retrieves the current offset into the file-like object. + + Returns: + int: current offset in the data range. + """ + return self._current_offset + + def get_size(self): + """Retrieves the size of the file-like object. + + Returns: + int: size of the data range. + """ + return self._range_size + + def read(self, size=None): + """Reads a byte string from the file-like object at the current offset. + + The function will read a byte string of the specified size or + all of the remaining data if no size was specified. + + Args: + size (Optional[int]): number of bytes to read, where None is all + remaining data. + + Returns: + bytes: data read. + + Raises: + IOError: if the read failed. + """ + if (self._range_offset < 0 or + (self._range_size is not None and self._range_size < 0)): + raise IOError("Invalid data range.") + + if self._current_offset < 0: + raise IOError( + "Invalid current offset: {0:d} value less than zero.".format( + self._current_offset)) + + if (self._range_size is not None and + self._current_offset >= self._range_size): + return b"" + + if size is None: + size = self._range_size + if self._range_size is not None and self._current_offset + size > self._range_size: + size = self._range_size - self._current_offset + + self._file_object.seek( + self._range_offset + self._current_offset, os.SEEK_SET) + + data = self._file_object.read(size) + + self._current_offset += len(data) + + return data + + def seek(self, offset, whence=os.SEEK_SET): + """Seeks to an offset within the file-like object. + + Args: + offset (int): offset to seek to. + whence (Optional(int)): value that indicates whether offset is an absolute + or relative position within the file. + + Raises: + IOError: if the seek failed. + """ + if self._current_offset < 0: + raise IOError( + "Invalid current offset: {0:d} value less than zero.".format( + self._current_offset)) + + if whence == os.SEEK_CUR: + offset += self._current_offset + elif whence == os.SEEK_END: + offset += self._range_size + elif whence != os.SEEK_SET: + raise IOError("Unsupported whence.") + if offset < 0: + raise IOError("Invalid offset value less than zero.") + + self._current_offset = offset + + +class FileEntryTypeTests(unittest.TestCase): + """Tests the file_entry type.""" + + def test_get_size(self): + """Tests the get_size function and size property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + size = fsntfs_file_entry.get_size() + self.assertIsNotNone(size) + + self.assertIsNotNone(fsntfs_file_entry.size) + + finally: + fsntfs_volume.close() + + def test_get_number_of_extents(self): + """Tests the get_number_of_extents function and number_of_extents property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + number_of_extents = fsntfs_file_entry.get_number_of_extents() + self.assertIsNotNone(number_of_extents) + + self.assertIsNotNone(fsntfs_file_entry.number_of_extents) + + finally: + fsntfs_volume.close() + + def test_get_extent(self): + """Tests the get_extent function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + if not fsntfs_file_entry.number_of_extents: + raise unittest.SkipTest('missing extents') + + extent = fsntfs_file_entry.get_extent(0) + self.assertIsNotNone(extent) + + finally: + fsntfs_volume.close() + + def test_is_empty(self): + """Tests the is_empty function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + is_empty = fsntfs_file_entry.is_empty() + self.assertFalse(is_empty) + + finally: + fsntfs_volume.close() + + def test_is_allocated(self): + """Tests the is_allocated function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + is_allocated = fsntfs_file_entry.is_allocated() + self.assertTrue(is_allocated) + + finally: + fsntfs_volume.close() + + def test_has_directory_entries_index(self): + """Tests the has_directory_entries_index function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + has_directory_entries_index = fsntfs_file_entry.has_directory_entries_index() + self.assertFalse(has_directory_entries_index) + + finally: + fsntfs_volume.close() + + def test_has_default_data_stream(self): + """Tests the has_default_data_stream function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + has_default_data_stream = fsntfs_file_entry.has_default_data_stream() + self.assertTrue(has_default_data_stream) + + finally: + fsntfs_volume.close() + + def test_get_file_reference(self): + """Tests the get_file_reference function and file_reference property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + file_reference = fsntfs_file_entry.get_file_reference() + self.assertIsNotNone(file_reference) + + self.assertIsNotNone(fsntfs_file_entry.file_reference) + + finally: + fsntfs_volume.close() + + def test_get_base_record_file_reference(self): + """Tests the get_base_record_file_reference function and base_record_file_reference property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + base_record_file_reference = fsntfs_file_entry.get_base_record_file_reference() + self.assertIsNotNone(base_record_file_reference) + + self.assertIsNotNone(fsntfs_file_entry.base_record_file_reference) + + finally: + fsntfs_volume.close() + + # TODO: add tests for parent_file_reference + # TODO: add tests for get_parent_file_reference_by_attribute_index + + def test_get_journal_sequence_number(self): + """Tests the get_journal_sequence_number function and journal_sequence_number property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + journal_sequence_number = fsntfs_file_entry.get_journal_sequence_number() + self.assertIsNotNone(journal_sequence_number) + + self.assertIsNotNone(fsntfs_file_entry.journal_sequence_number) + + finally: + fsntfs_volume.close() + + def test_get_creation_time(self): + """Tests the get_creation_time function and creation_time property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + creation_time = fsntfs_file_entry.get_creation_time() + self.assertIsNotNone(creation_time) + + self.assertIsNotNone(fsntfs_file_entry.creation_time) + + finally: + fsntfs_volume.close() + + def test_get_creation_time_as_integer(self): + """Tests the get_creation_time_as_integer function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + creation_time = fsntfs_file_entry.get_creation_time_as_integer() + self.assertIsNotNone(creation_time) + + finally: + fsntfs_volume.close() + + def test_get_modification_time(self): + """Tests the get_modification_time function and modification_time property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + modification_time = fsntfs_file_entry.get_modification_time() + self.assertIsNotNone(modification_time) + + self.assertIsNotNone(fsntfs_file_entry.modification_time) + + finally: + fsntfs_volume.close() + + def test_get_modification_time_as_integer(self): + """Tests the get_modification_time_as_integer function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + modification_time = fsntfs_file_entry.get_modification_time_as_integer() + self.assertIsNotNone(modification_time) + + finally: + fsntfs_volume.close() + + def test_get_access_time(self): + """Tests the get_access_time function and access_time property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + access_time = fsntfs_file_entry.get_access_time() + self.assertIsNotNone(access_time) + + self.assertIsNotNone(fsntfs_file_entry.access_time) + + finally: + fsntfs_volume.close() + + def test_get_access_time_as_integer(self): + """Tests the get_access_time_as_integer function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + access_time = fsntfs_file_entry.get_access_time_as_integer() + self.assertIsNotNone(access_time) + + finally: + fsntfs_volume.close() + + def test_get_entry_modification_time(self): + """Tests the get_entry_modification_time function and entry_modification_time property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + entry_modification_time = fsntfs_file_entry.get_entry_modification_time() + self.assertIsNotNone(entry_modification_time) + + self.assertIsNotNone(fsntfs_file_entry.entry_modification_time) + + finally: + fsntfs_volume.close() + + def test_get_entry_modification_time_as_integer(self): + """Tests the get_entry_modification_time_as_integer function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + entry_modification_time = fsntfs_file_entry.get_entry_modification_time_as_integer() + self.assertIsNotNone(entry_modification_time) + + finally: + fsntfs_volume.close() + + def test_get_name(self): + """Tests the get_name function and name property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + name = fsntfs_file_entry.get_name() + self.assertIsNotNone(name) + + self.assertIsNotNone(fsntfs_file_entry.name) + + finally: + fsntfs_volume.close() + + def test_get_name_attribute_index(self): + """Tests the get_name_attribute_index function and name_attribute_index property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + name_attribute_index = fsntfs_file_entry.get_name_attribute_index() + self.assertIsNotNone(name_attribute_index) + + self.assertIsNotNone(fsntfs_file_entry.name_attribute_index) + + finally: + fsntfs_volume.close() + + def test_get_file_attribute_flags(self): + """Tests the get_file_attribute_flags function and file_attribute_flags property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + file_attribute_flags = fsntfs_file_entry.get_file_attribute_flags() + self.assertIsNotNone(file_attribute_flags) + + self.assertIsNotNone(fsntfs_file_entry.file_attribute_flags) + + finally: + fsntfs_volume.close() + + # TODO: add tests for path_hint + + def test_get_symbolic_link_target(self): + """Tests the get_symbolic_link_target function and symbolic_link_target property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + symbolic_link_target = fsntfs_file_entry.get_symbolic_link_target() + self.assertIsNone(symbolic_link_target) + + self.assertIsNone(fsntfs_file_entry.symbolic_link_target) + + finally: + fsntfs_volume.close() + + def test_get_security_descriptor_data(self): + """Tests the get_security_descriptor_data function and security_descriptor_data property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_root_directory() + self.assertIsNotNone(fsntfs_file_entry) + + security_descriptor_data = fsntfs_file_entry.get_security_descriptor_data() + self.assertIsNotNone(security_descriptor_data) + + self.assertIsNotNone(fsntfs_file_entry.security_descriptor_data) + + finally: + fsntfs_volume.close() + + def test_get_number_of_attributes(self): + """Tests the get_number_of_attributes function and number_of_attributes property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + number_of_attributes = fsntfs_file_entry.get_number_of_attributes() + self.assertIsNotNone(number_of_attributes) + + self.assertIsNotNone(fsntfs_file_entry.number_of_attributes) + + finally: + fsntfs_volume.close() + + def test_get_attribute(self): + """Tests the get_attribute function.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + if not fsntfs_file_entry.number_of_attributes: + raise unittest.SkipTest('missing attributes') + + attribute = fsntfs_file_entry.get_attribute(0) + self.assertIsNotNone(attribute) + + finally: + fsntfs_volume.close() + + def test_get_number_of_alternate_data_streams(self): + """Tests the get_number_of_alternate_data_streams function and number_of_alternate_data_streams property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + number_of_alternate_data_streams = fsntfs_file_entry.get_number_of_alternate_data_streams() + self.assertIsNotNone(number_of_alternate_data_streams) + + self.assertIsNotNone(fsntfs_file_entry.number_of_alternate_data_streams) + + finally: + fsntfs_volume.close() + + # TODO: add tests for get_alternate_data_stream + # TODO: add tests for has_alternate_data_stream_by_name + # TODO: add tests for get_alternate_data_stream_by_name + + def test_get_number_of_sub_file_entries(self): + """Tests the get_number_of_sub_file_entries function and number_of_sub_file_entries property.""" + test_source = getattr(unittest, "source", None) + if not test_source: + raise unittest.SkipTest('missing source') + + test_offset = getattr(unittest, "offset", None) + + with DataRangeFileObject( + test_source, test_offset or 0, None) as file_object: + + fsntfs_volume = pyfsntfs.volume() + fsntfs_volume.open_file_object(file_object) + + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') + + try: + fsntfs_file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(fsntfs_file_entry) + + number_of_sub_file_entries = fsntfs_file_entry.get_number_of_sub_file_entries() + self.assertIsNotNone(number_of_sub_file_entries) + + self.assertIsNotNone(fsntfs_file_entry.number_of_sub_file_entries) + + finally: + fsntfs_volume.close() + + # TODO: add tests for get_sub_file_entry + + +if __name__ == "__main__": + argument_parser = argparse.ArgumentParser() + + argument_parser.add_argument( + "-o", "--offset", dest="offset", action="store", default=None, + type=int, help="offset of the source file.") + + argument_parser.add_argument( + "source", nargs="?", action="store", metavar="PATH", + default=None, help="path of the source file.") + + options, unknown_options = argument_parser.parse_known_args() + unknown_options.insert(0, sys.argv[0]) + + setattr(unittest, "offset", options.offset) + setattr(unittest, "source", options.source) + + unittest.main(argv=unknown_options, verbosity=2) diff --git a/tests/pyfsntfs_test_volume.py b/tests/pyfsntfs_test_volume.py index 87f93c65..4793eac2 100644 --- a/tests/pyfsntfs_test_volume.py +++ b/tests/pyfsntfs_test_volume.py @@ -39,6 +39,10 @@ def __init__(self, path, range_offset, range_size): range_size (int): size of the data range starts, or None to indicate the range should continue to the end of the parent file-like object. """ + if range_size is None: + stat_object = os.stat(path) + range_size = stat_object.st_size + super(DataRangeFileObject, self).__init__() self._current_offset = 0 self._file_object = open(path, "rb") @@ -157,12 +161,13 @@ def test_signal_abort(self): def test_open(self): """Tests the open function.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") - if unittest.offset: - raise unittest.SkipTest("source defines offset") + test_offset = getattr(unittest, "offset", None) + if test_offset: + raise unittest.SkipTest("unsupported source with offset") fsntfs_volume = pyfsntfs.volume() @@ -181,7 +186,7 @@ def test_open(self): def test_open_file_object(self): """Tests the open_file_object function.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") @@ -190,8 +195,10 @@ def test_open_file_object(self): fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume.open_file_object(file_object) @@ -208,7 +215,7 @@ def test_open_file_object(self): def test_close(self): """Tests the close function.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") @@ -219,12 +226,13 @@ def test_close(self): def test_open_close(self): """Tests the open and close functions.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: return - if unittest.offset: - raise unittest.SkipTest("source defines offset") + test_offset = getattr(unittest, "offset", None) + if test_offset: + raise unittest.SkipTest("unsupported source with offset") fsntfs_volume = pyfsntfs.volume() @@ -254,221 +262,268 @@ def test_open_close(self): def test_get_bytes_per_sector(self): """Tests the get_bytes_per_sector function and bytes_per_sector property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - bytes_per_sector = fsntfs_volume.get_bytes_per_sector() - self.assertIsNotNone(bytes_per_sector) + try: + bytes_per_sector = fsntfs_volume.get_bytes_per_sector() + self.assertIsNotNone(bytes_per_sector) - self.assertIsNotNone(fsntfs_volume.bytes_per_sector) + self.assertIsNotNone(fsntfs_volume.bytes_per_sector) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_cluster_block_size(self): """Tests the get_cluster_block_size function and cluster_block_size property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - cluster_block_size = fsntfs_volume.get_cluster_block_size() - self.assertIsNotNone(cluster_block_size) + try: + cluster_block_size = fsntfs_volume.get_cluster_block_size() + self.assertIsNotNone(cluster_block_size) - self.assertIsNotNone(fsntfs_volume.cluster_block_size) + self.assertIsNotNone(fsntfs_volume.cluster_block_size) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_mft_entry_size(self): """Tests the get_mft_entry_size function and mft_entry_size property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - mft_entry_size = fsntfs_volume.get_mft_entry_size() - self.assertIsNotNone(mft_entry_size) + try: + mft_entry_size = fsntfs_volume.get_mft_entry_size() + self.assertIsNotNone(mft_entry_size) - self.assertIsNotNone(fsntfs_volume.mft_entry_size) + self.assertIsNotNone(fsntfs_volume.mft_entry_size) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_index_entry_size(self): """Tests the get_index_entry_size function and index_entry_size property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - index_entry_size = fsntfs_volume.get_index_entry_size() - self.assertIsNotNone(index_entry_size) + try: + index_entry_size = fsntfs_volume.get_index_entry_size() + self.assertIsNotNone(index_entry_size) - self.assertIsNotNone(fsntfs_volume.index_entry_size) + self.assertIsNotNone(fsntfs_volume.index_entry_size) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_name(self): """Tests the get_name function and name property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - name = fsntfs_volume.get_name() - self.assertIsNotNone(name) + try: + name = fsntfs_volume.get_name() + self.assertIsNotNone(name) - self.assertIsNotNone(fsntfs_volume.name) + self.assertIsNotNone(fsntfs_volume.name) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_serial_number(self): """Tests the get_serial_number function and serial_number property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - serial_number = fsntfs_volume.get_serial_number() - self.assertIsNotNone(serial_number) + try: + serial_number = fsntfs_volume.get_serial_number() + self.assertIsNotNone(serial_number) - self.assertIsNotNone(fsntfs_volume.serial_number) + self.assertIsNotNone(fsntfs_volume.serial_number) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_number_of_file_entries(self): """Tests the get_number_of_file_entries function and number_of_file_entries property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - number_of_file_entries = fsntfs_volume.get_number_of_file_entries() - self.assertIsNotNone(number_of_file_entries) + try: + number_of_file_entries = fsntfs_volume.get_number_of_file_entries() + self.assertIsNotNone(number_of_file_entries) - self.assertIsNotNone(fsntfs_volume.number_of_file_entries) + self.assertIsNotNone(fsntfs_volume.number_of_file_entries) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_file_entry(self): """Tests the get_file_entry function.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest('missing source') + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - file_entry = fsntfs_volume.get_file_entry(0) - self.assertIsNotNone(file_entry) + if not fsntfs_volume.number_of_file_entries: + raise unittest.SkipTest('missing file entries') - fsntfs_volume.close() + try: + file_entry = fsntfs_volume.get_file_entry(0) + self.assertIsNotNone(file_entry) + + finally: + fsntfs_volume.close() def test_get_file_entry_by_path(self): """Tests the get_file_entry_by_path function.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest('missing source') + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') - self.assertIsNotNone(file_entry) + try: + file_entry = fsntfs_volume.get_file_entry_by_path('\\$MFT') + self.assertIsNotNone(file_entry) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_root_directory(self): """Tests the get_root_directory function and root_directory property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - root_directory = fsntfs_volume.get_root_directory() - self.assertIsNotNone(root_directory) + try: + root_directory = fsntfs_volume.get_root_directory() + self.assertIsNotNone(root_directory) - fsntfs_volume.close() + finally: + fsntfs_volume.close() def test_get_usn_change_journal(self): """Tests the get_usn_change_journal function and usn_change_journal property.""" - test_source = unittest.source + test_source = getattr(unittest, "source", None) if not test_source: raise unittest.SkipTest("missing source") fsntfs_volume = pyfsntfs.volume() + test_offset = getattr(unittest, "offset", None) + with DataRangeFileObject( - test_source, unittest.offset or 0, None) as file_object: + test_source, test_offset or 0, None) as file_object: fsntfs_volume = pyfsntfs.volume() fsntfs_volume.open_file_object(file_object) - _ = fsntfs_volume.get_usn_change_journal() + try: + _ = fsntfs_volume.get_usn_change_journal() - fsntfs_volume.close() + finally: + fsntfs_volume.close() if __name__ == "__main__": diff --git a/tests/runtests.py b/tests/runtests.py index 716ad906..39a521be 100755 --- a/tests/runtests.py +++ b/tests/runtests.py @@ -2,7 +2,7 @@ # # Script to run Python test scripts. # -# Version: 20231009 +# Version: 20231024 import glob import os @@ -70,6 +70,9 @@ def ReadIgnoreList(test_profile): if lines[0] == "# libyal test data options": for line in lines[1:]: key, value = line.split("=", maxsplit=1) + if key == 'offset': + value = int(value) + setattr(unittest, key, value) test_results = test_runner.run(test_scripts) diff --git a/tests/test_python_module.sh b/tests/test_python_module.sh index 57191833..003933fd 100755 --- a/tests/test_python_module.sh +++ b/tests/test_python_module.sh @@ -8,7 +8,7 @@ EXIT_FAILURE=1; EXIT_IGNORE=77; TEST_FUNCTIONS="support"; -TEST_FUNCTIONS_WITH_INPUT="volume"; +TEST_FUNCTIONS_WITH_INPUT="attribute file_entry volume"; OPTION_SETS=("offset"); TEST_TOOL_DIRECTORY=".";