diff --git a/config/dpkg/control b/config/dpkg/control index 4c8d4672..6d665b79 100644 --- a/config/dpkg/control +++ b/config/dpkg/control @@ -9,7 +9,7 @@ Homepage: https://github.com/log2timeline/dfvfs Package: python3-dfvfs Architecture: all -Depends: libbde-python3 (>= 20140531), libewf-python3 (>= 20131210), libfsapfs-python3 (>= 20201107), libfsext-python3 (>= 20220112), libfshfs-python3 (>= 20220113), libfsntfs-python3 (>= 20211229), libfsxfs-python3 (>= 20220113), libfvde-python3 (>= 20160719), libfwnt-python3 (>= 20210717), libluksde-python3 (>= 20200101), libmodi-python3 (>= 20210405), libphdi-python3 (>= 20220110), libqcow-python3 (>= 20201213), libsigscan-python3 (>= 20191221), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsgpt-python3 (>= 20211115), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-cffi-backend (>= 1.9.1), python3-cryptography (>= 2.0.2), python3-dfdatetime (>= 20211113), python3-dtfabric (>= 20170524), python3-idna (>= 2.5), python3-pytsk3 (>= 20210419), python3-pyxattr (>= 0.7.2), python3-yaml (>= 3.10), ${misc:Depends} +Depends: libbde-python3 (>= 20140531), libewf-python3 (>= 20131210), libfsapfs-python3 (>= 20201107), libfsext-python3 (>= 20220112), libfshfs-python3 (>= 20220114), libfsntfs-python3 (>= 20211229), libfsxfs-python3 (>= 20220113), libfvde-python3 (>= 20160719), libfwnt-python3 (>= 20210717), libluksde-python3 (>= 20200101), libmodi-python3 (>= 20210405), libphdi-python3 (>= 20220110), libqcow-python3 (>= 20201213), libsigscan-python3 (>= 20191221), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsgpt-python3 (>= 20211115), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-cffi-backend (>= 1.9.1), python3-cryptography (>= 2.0.2), python3-dfdatetime (>= 20211113), python3-dtfabric (>= 20170524), python3-idna (>= 2.5), python3-pytsk3 (>= 20210419), python3-pyxattr (>= 0.7.2), python3-yaml (>= 3.10), ${misc:Depends} Description: Python 3 module of dfVFS dfVFS, or Digital Forensics Virtual File System, provides read-only access to file-system objects from various storage media types and file formats. The goal diff --git a/dependencies.ini b/dependencies.ini index 7334cd8b..14c45848 100644 --- a/dependencies.ini +++ b/dependencies.ini @@ -66,7 +66,7 @@ version_property: get_version() [pyfshfs] dpkg_name: libfshfs-python3 l2tbinaries_name: libfshfs -minimum_version: 20220113 +minimum_version: 20220114 pypi_name: libfshfs-python rpm_name: libfshfs-python3 version_property: get_version() diff --git a/dfvfs/vfs/hfs_data_stream.py b/dfvfs/vfs/hfs_data_stream.py new file mode 100644 index 00000000..37add57c --- /dev/null +++ b/dfvfs/vfs/hfs_data_stream.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +"""The HFS data stream implementation.""" + +from dfvfs.vfs import data_stream + + +class HFSDataStream(data_stream.DataStream): + """File system data stream that uses pyfshfs.""" + + def __init__(self, fshfs_data_stream): + """Initializes the data stream. + + Args: + fshfs_data_stream (pyfshfs.data_stream): HFS data stream. + """ + super(HFSDataStream, self).__init__() + self._fshfs_data_stream = fshfs_data_stream + self._name = '' + + if fshfs_data_stream: + self._name = 'rsrc' + + @property + def name(self): + """str: name.""" + return self._name + + def IsDefault(self): + """Determines if the data stream is the default (data fork) data stream. + + Returns: + bool: True if the data stream is the default (data fork) data stream. + """ + return not self._fshfs_data_stream diff --git a/dfvfs/vfs/hfs_file_entry.py b/dfvfs/vfs/hfs_file_entry.py index f47eec83..c4b635ff 100644 --- a/dfvfs/vfs/hfs_file_entry.py +++ b/dfvfs/vfs/hfs_file_entry.py @@ -11,6 +11,7 @@ from dfvfs.vfs import extent from dfvfs.vfs import file_entry from dfvfs.vfs import hfs_attribute +from dfvfs.vfs import hfs_data_stream from dfvfs.vfs import hfs_directory @@ -84,6 +85,26 @@ def _GetAttributes(self): return self._attributes + def _GetDataStreams(self): + """Retrieves the data streams. + + Returns: + list[HFSDataStream]: data streams. + """ + if self._data_streams is None: + self._data_streams = [] + + if self.entry_type == definitions.FILE_ENTRY_TYPE_FILE: + data_stream = hfs_data_stream.HFSDataStream(None) + self._data_streams.append(data_stream) + + fshfs_data_stream = self._fshfs_file_entry.get_resource_fork() + if fshfs_data_stream: + data_stream = hfs_data_stream.HFSDataStream(fshfs_data_stream) + self._data_streams.append(data_stream) + + return self._data_streams + def _GetDirectory(self): """Retrieves a directory. diff --git a/dfvfs/vfs/ntfs_data_stream.py b/dfvfs/vfs/ntfs_data_stream.py index c0131bb2..4ba6f801 100644 --- a/dfvfs/vfs/ntfs_data_stream.py +++ b/dfvfs/vfs/ntfs_data_stream.py @@ -8,18 +8,18 @@ class NTFSDataStream(data_stream.DataStream): """File system data stream that uses pyfsntfs.""" def __init__(self, fsntfs_data_stream): - """Initializes the data stream object. + """Initializes the data stream. Args: fsntfs_data_stream (pyfsntfs.data_stream): NTFS data stream. """ super(NTFSDataStream, self).__init__() - self._fsntfs_data_stream = fsntfs_data_stream + self._name = getattr(fsntfs_data_stream, 'name', None) or '' @property def name(self): """str: name.""" - return getattr(self._fsntfs_data_stream, 'name', '') + return self._name def IsDefault(self): """Determines if the data stream is the default data stream. @@ -27,4 +27,4 @@ def IsDefault(self): Returns: bool: True if the data stream is the default data stream. """ - return not self._fsntfs_data_stream + return not bool(self._name) diff --git a/dfvfs/vfs/tsk_data_stream.py b/dfvfs/vfs/tsk_data_stream.py index 4d36eb6b..feba20a6 100644 --- a/dfvfs/vfs/tsk_data_stream.py +++ b/dfvfs/vfs/tsk_data_stream.py @@ -9,32 +9,34 @@ class TSKDataStream(data_stream.DataStream): """File system data stream that uses pytsk3.""" - def __init__(self, file_system, pytsk_attribute): + def __init__(self, pytsk_attribute): """Initializes a data stream. Args: - file_system (TSKFileSystem): file system. pytsk_attribute (pytsk3.Attribute): TSK attribute. """ super(TSKDataStream, self).__init__() - self._file_system = file_system - self._tsk_attribute = pytsk_attribute + self._name = '' - @property - def name(self): - """str: name.""" - if self._tsk_attribute: + if pytsk_attribute: # The value of the attribute name will be None for the default # data stream. - attribute_name = getattr(self._tsk_attribute.info, 'name', None) - if attribute_name: + attribute_name = getattr(pytsk_attribute.info, 'name', None) + attribute_type = getattr(pytsk_attribute.info, 'type', None) + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC: + self._name = 'rsrc' + + elif attribute_name: try: # pytsk3 returns an UTF-8 encoded byte string. - return attribute_name.decode('utf8') + self._name = attribute_name.decode('utf8') except UnicodeError: pass - return '' + @property + def name(self): + """str: name.""" + return self._name def IsDefault(self): """Determines if the data stream is the default data stream. @@ -42,15 +44,4 @@ def IsDefault(self): Returns: bool: True if the data stream is the default data stream, false if not. """ - if not self._tsk_attribute or not self._file_system: - return True - - if self._file_system.IsHFS(): - attribute_type = getattr(self._tsk_attribute.info, 'type', None) - return attribute_type in ( - pytsk3.TSK_FS_ATTR_TYPE_HFS_DEFAULT, pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA) - - if self._file_system.IsNTFS(): - return not bool(self.name) - - return True + return not bool(self._name) diff --git a/dfvfs/vfs/tsk_file_entry.py b/dfvfs/vfs/tsk_file_entry.py index 69571fda..16a58c16 100644 --- a/dfvfs/vfs/tsk_file_entry.py +++ b/dfvfs/vfs/tsk_file_entry.py @@ -365,12 +365,13 @@ def _GetDataStreams(self): """ if self._data_streams is None: if self._file_system.IsHFS(): - known_data_attribute_types = [ + known_data_attribute_types = ( pytsk3.TSK_FS_ATTR_TYPE_HFS_DEFAULT, - pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA] + pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA, + pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC) elif self._file_system.IsNTFS(): - known_data_attribute_types = [pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA] + known_data_attribute_types = (pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA, ) else: known_data_attribute_types = None @@ -382,7 +383,7 @@ def _GetDataStreams(self): if not known_data_attribute_types: if tsk_fs_meta_type == pytsk3.TSK_FS_META_TYPE_REG: - data_stream = tsk_data_stream.TSKDataStream(self._file_system, None) + data_stream = tsk_data_stream.TSKDataStream(None) self._data_streams.append(data_stream) else: @@ -397,8 +398,7 @@ def _GetDataStreams(self): attribute_type = getattr(pytsk_attribute.info, 'type', None) if attribute_type in known_data_attribute_types: - data_stream = tsk_data_stream.TSKDataStream( - self._file_system, pytsk_attribute) + data_stream = tsk_data_stream.TSKDataStream(pytsk_attribute) self._data_streams.append(data_stream) return self._data_streams diff --git a/requirements.txt b/requirements.txt index b6bef401..8a950eae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ libbde-python >= 20140531 libewf-python >= 20131210 libfsapfs-python >= 20201107 libfsext-python >= 20220112 -libfshfs-python >= 20220113 +libfshfs-python >= 20220114 libfsntfs-python >= 20211229 libfsxfs-python >= 20220113 libfvde-python >= 20160719 diff --git a/setup.cfg b/setup.cfg index a7fcd686..63d87665 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,7 +21,7 @@ requires = libbde-python3 >= 20140531 libewf-python3 >= 20131210 libfsapfs-python3 >= 20201107 libfsext-python3 >= 20220112 - libfshfs-python3 >= 20220113 + libfshfs-python3 >= 20220114 libfsntfs-python3 >= 20211229 libfsxfs-python3 >= 20220113 libfvde-python3 >= 20160719 diff --git a/tests/vfs/hfs_data_stream.py b/tests/vfs/hfs_data_stream.py new file mode 100644 index 00000000..5e752476 --- /dev/null +++ b/tests/vfs/hfs_data_stream.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests for the data stream implementation using pyfshfs.""" + +import unittest + +from dfvfs.vfs import hfs_data_stream + +from tests import test_lib as shared_test_lib + + +class HFSDataStreamTest(shared_test_lib.BaseTestCase): + """Tests the HFS data stream.""" + + def testName(self): + """Test the name property.""" + test_data_stream = hfs_data_stream.HFSDataStream(None) + self.assertEqual(test_data_stream.name, '') + + def testIsDefault(self): + """Test the IsDefault function.""" + test_data_stream = hfs_data_stream.HFSDataStream(None) + result = test_data_stream.IsDefault() + self.assertTrue(result) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/vfs/hfs_file_entry.py b/tests/vfs/hfs_file_entry.py index d2047b0a..dc8fb27f 100644 --- a/tests/vfs/hfs_file_entry.py +++ b/tests/vfs/hfs_file_entry.py @@ -146,8 +146,8 @@ def testGetAttributes(self): test_attribute_value_data = test_attribute.read() self.assertEqual(test_attribute_value_data, b'My extended attribute') - def testGetStat(self): - """Tests the _GetStat function.""" + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" path_spec = path_spec_factory.Factory.NewPathSpec( definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_ANOTHER_FILE, @@ -156,49 +156,18 @@ def testGetStat(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - stat_object = file_entry._GetStat() - - self.assertIsNotNone(stat_object) - self.assertEqual(stat_object.type, stat_object.TYPE_FILE) - self.assertEqual(stat_object.size, 22) - - self.assertEqual(stat_object.mode, 0o644) - self.assertEqual(stat_object.uid, 501) - self.assertEqual(stat_object.gid, 20) - - self.assertEqual(stat_object.atime, 1642144782) - self.assertFalse(hasattr(stat_object, 'atime_nano')) - - self.assertEqual(stat_object.ctime, 1642144782) - self.assertFalse(hasattr(stat_object, 'ctime_nano')) - - self.assertEqual(stat_object.crtime, 1642144782) - self.assertFalse(hasattr(stat_object, 'crtime_nano')) - - self.assertEqual(stat_object.mtime, 1642144782) - self.assertFalse(hasattr(stat_object, 'mtime_nano')) + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) - def testGetStatAttribute(self): - """Tests the _GetStatAttribute function.""" path_spec = path_spec_factory.Factory.NewPathSpec( definitions.TYPE_INDICATOR_HFS, - identifier=self._IDENTIFIER_ANOTHER_FILE, - location='/a_directory/another_file', + identifier=25, location='/a_directory/a_resourcefork', parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - stat_attribute = file_entry._GetStatAttribute() - - self.assertIsNotNone(stat_attribute) - self.assertEqual(stat_attribute.group_identifier, 20) - self.assertEqual(stat_attribute.inode_number, 21) - self.assertEqual(stat_attribute.mode, 0o100644) - # TODO: implement number of hard links support in pyfshfs - # self.assertEqual(stat_attribute.number_of_links, 1) - self.assertEqual(stat_attribute.owner_identifier, 501) - self.assertEqual(stat_attribute.size, 22) - self.assertEqual(stat_attribute.type, stat_attribute.TYPE_FILE) + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) def testGetExtents(self): """Tests the GetExtents function.""" @@ -265,6 +234,60 @@ def testGetParentFileEntry(self): self.assertEqual(parent_file_entry.name, 'a_directory') + def testGetStat(self): + """Tests the _GetStat function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, + location='/a_directory/another_file', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + stat_object = file_entry._GetStat() + + self.assertIsNotNone(stat_object) + self.assertEqual(stat_object.type, stat_object.TYPE_FILE) + self.assertEqual(stat_object.size, 22) + + self.assertEqual(stat_object.mode, 0o644) + self.assertEqual(stat_object.uid, 501) + self.assertEqual(stat_object.gid, 20) + + self.assertEqual(stat_object.atime, 1642144782) + self.assertFalse(hasattr(stat_object, 'atime_nano')) + + self.assertEqual(stat_object.ctime, 1642144782) + self.assertFalse(hasattr(stat_object, 'ctime_nano')) + + self.assertEqual(stat_object.crtime, 1642144782) + self.assertFalse(hasattr(stat_object, 'crtime_nano')) + + self.assertEqual(stat_object.mtime, 1642144782) + self.assertFalse(hasattr(stat_object, 'mtime_nano')) + + def testGetStatAttribute(self): + """Tests the _GetStatAttribute function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, + location='/a_directory/another_file', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + stat_attribute = file_entry._GetStatAttribute() + + self.assertIsNotNone(stat_attribute) + self.assertEqual(stat_attribute.group_identifier, 20) + self.assertEqual(stat_attribute.inode_number, 21) + self.assertEqual(stat_attribute.mode, 0o100644) + # TODO: implement number of hard links support in pyfshfs + # self.assertEqual(stat_attribute.number_of_links, 1) + self.assertEqual(stat_attribute.owner_identifier, 501) + self.assertEqual(stat_attribute.size, 22) + self.assertEqual(stat_attribute.type, stat_attribute.TYPE_FILE) + def testIsFunctions(self): """Tests the Is? functions.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -388,6 +411,21 @@ def testDataStreams(self): self.assertEqual(data_stream_names, []) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=25, location='/a_directory/a_resourcefork', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 2) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, ['', 'rsrc']) + def testGetDataStream(self): """Tests the GetDataStream function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -398,8 +436,17 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') + self.assertIsNotNone(data_stream) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=25, location='/a_directory/a_resourcefork', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_stream = file_entry.GetDataStream('rsrc') self.assertIsNotNone(data_stream) diff --git a/tests/vfs/ntfs_data_stream.py b/tests/vfs/ntfs_data_stream.py index b08d14ba..2d4bfb9a 100644 --- a/tests/vfs/ntfs_data_stream.py +++ b/tests/vfs/ntfs_data_stream.py @@ -20,7 +20,8 @@ def testName(self): def testIsDefault(self): """Test the IsDefault function.""" test_data_stream = ntfs_data_stream.NTFSDataStream(None) - self.assertTrue(test_data_stream.IsDefault()) + result = test_data_stream.IsDefault() + self.assertTrue(result) if __name__ == '__main__': diff --git a/tests/vfs/ntfs_file_entry.py b/tests/vfs/ntfs_file_entry.py index 7a43751b..45a9187f 100644 --- a/tests/vfs/ntfs_file_entry.py +++ b/tests/vfs/ntfs_file_entry.py @@ -20,6 +20,7 @@ class NTFSFileEntryTest(shared_test_lib.BaseTestCase): _MFT_ENTRY_A_DIRECTORY = 64 _MFT_ENTRY_A_FILE = 65 + _MFT_ENTRY_ANOTHER_FILE = 67 _MFT_ENTRY_PASSWORDS_TXT = 66 def setUp(self): @@ -52,7 +53,27 @@ def testIntialize(self): self.assertIsNotNone(file_entry) # TODO: add tests for _GetAttributes - # TODO: add tests for _GetDataStreams + + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\another_file', + mft_entry=self._MFT_ENTRY_ANOTHER_FILE, parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\$UpCase', mft_entry=10, + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -646,15 +667,14 @@ def testDataStream(self): def testGetDataStream(self): """Tests the GetDataStream function.""" path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\a_file', - mft_entry=self._MFT_ENTRY_A_FILE, parent=self._raw_path_spec) + definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\another_file', + mft_entry=self._MFT_ENTRY_ANOTHER_FILE, parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '') data_stream = file_entry.GetDataStream('bogus') self.assertIsNone(data_stream) @@ -665,10 +685,9 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '$Info' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('$Info') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '$Info') def testGetSecurityDescriptor(self): """Tests the GetSecurityDescriptor function.""" diff --git a/tests/vfs/tsk_data_stream.py b/tests/vfs/tsk_data_stream.py index 0f95e911..2e5bcc90 100644 --- a/tests/vfs/tsk_data_stream.py +++ b/tests/vfs/tsk_data_stream.py @@ -4,11 +4,7 @@ import unittest -from dfvfs.lib import definitions -from dfvfs.path import factory as path_spec_factory -from dfvfs.resolver import context from dfvfs.vfs import tsk_data_stream -from dfvfs.vfs import tsk_file_system from tests import test_lib as shared_test_lib @@ -16,38 +12,14 @@ class TSKDataStreamTest(shared_test_lib.BaseTestCase): """Tests the SleuthKit (TSK) data stream.""" - def setUp(self): - """Sets up the needed objects used throughout the test.""" - self._resolver_context = context.Context() - test_path = self._GetTestFilePath(['ext2.raw']) - self._SkipIfPathNotExists(test_path) - - test_os_path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_OS, location=test_path) - self._raw_path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) - self._tsk_path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_TSK, location='/', - parent=self._raw_path_spec) - - self._file_system = tsk_file_system.TSKFileSystem( - self._resolver_context, self._tsk_path_spec) - self._file_system.Open() - - def tearDown(self): - """Cleans up the needed objects used throughout the test.""" - self._resolver_context.Empty() - def testName(self): """Test the name property.""" - test_data_stream = tsk_data_stream.TSKDataStream( - self._file_system, None) + test_data_stream = tsk_data_stream.TSKDataStream(None) self.assertEqual(test_data_stream.name, '') def testIsDefault(self): """Test the IsDefault function.""" - test_data_stream = tsk_data_stream.TSKDataStream( - self._file_system, None) + test_data_stream = tsk_data_stream.TSKDataStream(None) self.assertTrue(test_data_stream.IsDefault()) diff --git a/tests/vfs/tsk_file_entry.py b/tests/vfs/tsk_file_entry.py index 763d59a6..72a4afdd 100644 --- a/tests/vfs/tsk_file_entry.py +++ b/tests/vfs/tsk_file_entry.py @@ -158,7 +158,17 @@ def testGetAttributes(self): # No extended attributes are returned. # Also see: https://github.com/py4n6/pytsk/issues/79. - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -512,8 +522,7 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) @@ -569,7 +578,17 @@ def testGetAttributes(self): self.assertIsNotNone(file_entry._attributes) self.assertEqual(len(file_entry._attributes), 0) - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -891,8 +910,7 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) @@ -956,7 +974,26 @@ def testGetAttributes(self): test_attribute_value_data = test_attribute.read() self.assertEqual(test_attribute_value_data, b'My extended attribute') - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -1305,6 +1342,20 @@ def testDataStreams(self): self.assertEqual(data_stream_names, []) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 2) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, ['', 'rsrc']) + def testGetDataStream(self): """Tests the GetDataStream function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -1313,8 +1364,16 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') + self.assertIsNotNone(data_stream) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_stream = file_entry.GetDataStream('rsrc') self.assertIsNotNone(data_stream) @@ -1368,7 +1427,26 @@ def testGetAttributes(self): self.assertEqual( test_attribute.attribute_type, pytsk3.TSK_FS_ATTR_TYPE_NTFS_SI) - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._MFT_ENTRY_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=10, location='/$UpCase', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -1560,15 +1638,14 @@ def testDataStream(self): def testGetDataStream(self): """Tests the retrieve data stream functionality.""" path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_TSK, inode=self._MFT_ENTRY_A_FILE, - location='/a_directory/a_file', parent=self._raw_path_spec) + definitions.TYPE_INDICATOR_TSK, inode=self._MFT_ENTRY_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '') data_stream = file_entry.GetDataStream('bogus') self.assertIsNone(data_stream) @@ -1579,10 +1656,9 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '$Info' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('$Info') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '$Info') def testGetExtents(self): """Tests the GetExtents function."""