diff --git a/docker_explorer/de.py b/docker_explorer/de.py index a830d0c..c3fa40a 100644 --- a/docker_explorer/de.py +++ b/docker_explorer/de.py @@ -20,18 +20,13 @@ from __future__ import print_function, unicode_literals import argparse -import codecs import os -import sys from docker_explorer import errors +from docker_explorer.lib import container from docker_explorer.lib import aufs from docker_explorer.lib import overlay - -# This is to fix UnicodeEncodeError issues when python -# suddenly changes the output encoding when sys.stdout is -# piped into something else. -sys.stdout = codecs.getwriter('utf8')(sys.stdout) +from docker_explorer.lib import utils class DockerExplorer(object): @@ -47,19 +42,22 @@ class DockerExplorer(object): def __init__(self): """Initializes the ContainerInfo class.""" self._argument_parser = None - self.docker_directory = '/var/lib/docker' + self.container_config_filename = 'config.v2.json' + self.containers_directory = None + self.docker_directory = None + self.docker_version = 2 self.storage_object = None - def DetectStorage(self): - """Detects the storage backend. + def _SetDockerDirectory(self, docker_path): + """Sets the Docker main directory. - More info : - https://docs.docker.com/engine/userguide/storagedriver/ - http://jpetazzo.github.io/assets/2015-06-04-deep-dive-into-docker-storage-drivers.html#60 + Args: + docker_path(str): the absolute path to the docker directory. Raises: errors.BadStorageException: If the storage backend couldn't be detected. """ + self.docker_directory = docker_path if not os.path.isdir(self.docker_directory): err_message = ( '{0:s} is not a Docker directory\n' @@ -67,9 +65,13 @@ def DetectStorage(self): 'hint: de.py -r /var/lib/docker').format(self.docker_directory) raise errors.BadStorageException(err_message) + self.containers_directory = os.path.join( + self.docker_directory, 'containers') + if os.path.isfile( os.path.join(self.docker_directory, 'repositories-aufs')): - # Handles Docker engine storage versions 1.9 and below. + # TODO: check this agains other storages in version 1.9 and below + self.docker_version = 1 self.storage_object = aufs.AufsStorage( docker_directory=self.docker_directory, docker_version=1) elif os.path.isdir(os.path.join(self.docker_directory, 'overlay2')): @@ -105,7 +107,7 @@ def AddBasicOptions(self, argument_parser): def AddMountCommand(self, args): """Adds the mount command to the argument_parser. - args: + Args: args (argument_parser): the argument parser to add the command to. """ mount_parser = args.add_parser( @@ -164,13 +166,50 @@ def ParseArguments(self): return opts def ParseOptions(self, options): - """Parses the command line options. + """Parses the command line options.""" + self.docker_directory = os.path.abspath(options.docker_directory) + + def GetContainer(self, container_id): + """Returns a Container object given a container_id. + + Args: + container_id (str): the ID of the container. Returns: - Namespace: the populated namespace. + container.Container: the container object. """ + return container.Container( + self.docker_directory, container_id, docker_version=self.docker_version) - self.docker_directory = os.path.abspath(options.docker_directory) + def GetAllContainers(self): + """Gets a list containing information about all containers. + + Returns: + list(Container): the list of Container objects. + """ + container_ids_list = os.listdir(self.containers_directory) + if not container_ids_list: + print('Couldn\'t find any container configuration file (\'{0:s}\'). ' + 'Make sure the docker repository ({1:s}) is correct. ' + 'If it is correct, you might want to run this script' + ' with higher privileges.').format( + self.container_config_filename, self.docker_directory) + return [self.GetContainer(cid) for cid in container_ids_list] + + def GetContainersList(self, only_running=False): + """Returns a list of Container objects, sorted by start time. + + Args: + only_running (bool): Whether we return only running Containers. + + Returns: + list(Container): list of Containers information objects. + """ + containers_list = sorted( + self.GetAllContainers(), key=lambda x: x.start_timestamp) + if only_running: + containers_list = [x for x in containers_list if x.running] + return containers_list def Mount(self, container_id, mountpoint): """Mounts the specified container's filesystem. @@ -179,9 +218,40 @@ def Mount(self, container_id, mountpoint): container_id (str): the ID of the container. mountpoint (str): the path to the destination mount point. """ - if self.storage_object is None: - self.DetectStorage() - self.storage_object.Mount(container_id, mountpoint) + container_object = self.GetContainer(container_id) + self.storage_object.Mount(container_object, mountpoint) + + def GetContainersString(self, only_running=False): + """Returns a string describing the running containers. + + Args: + only_running (bool): Whether we display only running Containers. + + Returns: + str: the string displaying information about running containers. + """ + result_string = '' + for container_object in self.GetContainersList(only_running=only_running): + image_id = container_object.image_id + if self.docker_version == 2: + image_id = image_id.split(':')[1] + + if container_object.config_labels: + labels_list = ['{0:s}: {1:s}'.format(k, v) for (k, v) in + container_object.config_labels.items()] + labels_str = ', '.join(labels_list) + result_string += 'Container id: {0:s} / Labels : {1:s}\n'.format( + container_object.container_id, labels_str) + else: + result_string += 'Container id: {0:s} / No Label\n'.format( + container_object.container_id) + result_string += '\tStart date: {0:s}\n'.format( + utils.FormatDatetime(container_object.start_timestamp)) + result_string += '\tImage ID: {0:s}\n'.format(image_id) + result_string += '\tImage Name: {0:s}\n'.format( + container_object.config_image_name) + + return result_string def ShowContainers(self, only_running=False): """Displays the running containers. @@ -189,14 +259,10 @@ def ShowContainers(self, only_running=False): Args: only_running (bool): Whether we display only running Containers. """ - if self.storage_object is None: - self.DetectStorage() - print(self.storage_object.ShowContainers(only_running=only_running)) + print(self.GetContainersString(only_running=only_running)) def ShowRepositories(self): """Displays information about the images in the Docker repository.""" - if self.storage_object is None: - self.DetectStorage() print(self.storage_object.ShowRepositories()) def ShowHistory(self, container_id, show_empty_layers=False): @@ -206,8 +272,6 @@ def ShowHistory(self, container_id, show_empty_layers=False): container_id (str): the ID of the container. show_empty_layers (bool): whether to display empty layers. """ - if self.storage_object is None: - self.DetectStorage() print(self.storage_object.GetHistory(container_id, show_empty_layers)) def Main(self): @@ -221,6 +285,8 @@ def Main(self): options = self.ParseArguments() self.ParseOptions(options) + self._SetDockerDirectory(self.docker_directory) + if options.command == 'mount': self.Mount(options.container_id, options.mountpoint) diff --git a/docker_explorer/lib/aufs.py b/docker_explorer/lib/aufs.py index c213108..bbc6975 100644 --- a/docker_explorer/lib/aufs.py +++ b/docker_explorer/lib/aufs.py @@ -98,11 +98,11 @@ def GetImageInfo(self, image_id): return '{0:s}:{1:s}'.format(name, version) return 'not found' - def MakeMountCommands(self, container_id, mount_dir): + def MakeMountCommands(self, container_object, mount_dir): """Generates the required shell commands to mount a container's ID. Args: - container_id (str): the container ID to mount. + container_object (Container): the container object to mount. mount_dir (str): the path to the target mount point. Returns: @@ -113,7 +113,6 @@ def MakeMountCommands(self, container_id, mount_dir): print('Could not find /sbin/mount.aufs. Please install the aufs-tools ' 'package.') - container_object = self.GetContainer(container_id) mount_id = container_object.mount_id container_layers_filepath = os.path.join( diff --git a/docker_explorer/lib/container.py b/docker_explorer/lib/container.py index 885230d..fab26f7 100644 --- a/docker_explorer/lib/container.py +++ b/docker_explorer/lib/container.py @@ -17,6 +17,9 @@ from __future__ import print_function, unicode_literals import json +import os + +from docker_explorer import errors class Container(object): @@ -38,32 +41,60 @@ class Container(object): container. (Docker storage backend v1). """ - def __init__(self, container_id, container_info_json_path): + def __init__(self, docker_directory, container_id, docker_version=2): """Initializes the Container class. Args: + docker_directory (str): the absolute path to the Docker directory. container_id (str): the container ID. - container_info_json_path (str): the path to the JSON file containing the - container's information. + docker_version (int): (Optional) the version of the Docker storage module. + + Raises: + errors.BadContainerException: if there was an error when parsing + container_info_json_path """ - self.container_id = container_id + self.container_config_filename = 'config.v2.json' + if docker_version == 1: + self.container_config_filename = 'config.json' + + self.docker_directory = docker_directory + container_info_json_path = os.path.join( + self.docker_directory, 'containers', container_id, + self.container_config_filename) with open(container_info_json_path) as container_info_json_file: container_info_dict = json.load(container_info_json_file) + if container_info_dict is None: + raise errors.BadContainerException( + 'Could not load container configuration file {0}'.format( + container_info_json_path) + ) + + self.container_id = container_info_dict.get('ID', None) json_config = container_info_dict.get('Config', None) if json_config: self.config_image_name = json_config.get('Image', None) self.config_labels = json_config.get('Labels', None) self.creation_timestamp = container_info_dict.get('Created', None) self.image_id = container_info_dict.get('Image', None) + self.mount_id = None self.mount_points = container_info_dict.get('MountPoints', None) self.name = container_info_dict.get('Name', '') json_state = container_info_dict.get('State', None) if json_state: self.running = json_state.get('Running', False) self.start_timestamp = json_state.get('StartedAt', False) - self.storage_driver = json_config.get('Driver', None) + self.storage_driver = container_info_dict.get('Driver', None) + if self.storage_driver is None: + raise errors.BadContainerException( + '{0} container config file lacks Driver key'.format( + container_info_json_path)) self.volumes = container_info_dict.get('Volumes', None) - self.mount_id = None + if docker_version == 2: + c_path = os.path.join( + self.docker_directory, 'image', self.storage_driver, 'layerdb', + 'mounts', container_id) + with open(os.path.join(c_path, 'mount-id')) as mount_id_file: + self.mount_id = mount_id_file.read() diff --git a/docker_explorer/lib/overlay.py b/docker_explorer/lib/overlay.py index b2594c5..1433667 100644 --- a/docker_explorer/lib/overlay.py +++ b/docker_explorer/lib/overlay.py @@ -40,18 +40,17 @@ def _BuildLowerLayers(self, lower): return os.path.join( self.docker_directory, self.STORAGE_METHOD, lower.strip(), 'root') - def MakeMountCommands(self, container_id, mount_dir): + def MakeMountCommands(self, container_object, mount_dir): """Generates the required shell commands to mount a container's ID. Args: - container_id (str): the container ID to mount. + container_object (Container): the container object. mount_dir (str): the path to the target mount point. Returns: list: a list commands that needs to be run to mount the container's view of the file system. """ - container_object = self.GetContainer(container_id) mount_id_path = os.path.join( self.docker_directory, self.STORAGE_METHOD, container_object.mount_id) diff --git a/docker_explorer/lib/storage.py b/docker_explorer/lib/storage.py index 24af324..658e7c2 100644 --- a/docker_explorer/lib/storage.py +++ b/docker_explorer/lib/storage.py @@ -21,7 +21,6 @@ import subprocess import sys -from docker_explorer.lib import container from docker_explorer.lib import utils @@ -53,42 +52,20 @@ def __init__(self, docker_directory='/var/lib/docker', docker_version=2): if self.docker_version == 1: self.container_config_filename = 'config.json' - def GetAllContainers(self): - """Gets a list containing information about all containers. - - Returns: - list(Container): the list of Container objects. - """ - container_ids_list = os.listdir(self.containers_directory) - if not container_ids_list: - print('Couldn\'t find any container configuration file (\'{0:s}\'). ' - 'Make sure the docker repository ({1:s}) is correct. ' - 'If it is correct, you might want to run this script' - ' with higher privileges.').format( - self.container_config_filename, self.docker_directory) - return [self.GetContainer(x) for x in container_ids_list] - - def GetOrderedLayers(self, container_obj): + def GetOrderedLayers(self, container_object): """Returns an array of the sorted image ID for a container. Args: - container_obj(Container): the container object. + container_object(Container): the container object. Returns: list(str): a list of layer IDs (hashes). """ layer_list = [] - current_layer = container_obj.container_id + current_layer = container_object.container_id layer_path = os.path.join(self.docker_directory, 'graph', current_layer) if not os.path.isdir(layer_path): - config_file_path = os.path.join( - self.containers_directory, current_layer, - self.container_config_filename) - if not os.path.isfile(config_file_path): - return [] - with open(config_file_path) as config_file: - json_dict = json.load(config_file) - current_layer = json_dict.get('Image', None) + current_layer = container_object.image_id while current_layer is not None: layer_list.append(current_layer) @@ -110,44 +87,6 @@ def GetOrderedLayers(self, container_obj): return layer_list - def GetContainer(self, container_id): - """Returns a Container object given a container_id. - - Args: - container_id (str): the ID of the container. - - Returns: - Container: the container's info. - """ - container_info_json_path = os.path.join( - self.containers_directory, container_id, self.container_config_filename) - if os.path.isfile(container_info_json_path): - container_obj = container.Container( - container_id, container_info_json_path) - - if self.docker_version == 2: - c_path = os.path.join( - self.docker_directory, 'image', self.STORAGE_METHOD, 'layerdb', - 'mounts', container_id) - with open(os.path.join(c_path, 'mount-id')) as mount_id_file: - container_obj.mount_id = mount_id_file.read() - - return container_obj - - def GetContainersList(self, only_running=False): - """Returns a list of container ids which were running. - - Args: - only_running (bool): Whether we return only running Containers. - Returns: - list(Container): list of Containers information objects. - """ - containers_info_list = sorted( - self.GetAllContainers(), key=lambda x: x.start_timestamp) - if only_running: - containers_info_list = [x for x in containers_info_list if x.running] - return containers_info_list - def ShowRepositories(self): """Returns information about the images in the Docker repository. @@ -166,36 +105,6 @@ def ShowRepositories(self): repositories_string = rf.read() return result_string + utils.PrettyPrintJSON(repositories_string) - def ShowContainers(self, only_running=False): - """Returns a string describing the running containers. - - Args: - only_running (bool): Whether we display only running Containers. - Returns: - str: the string displaying information about running containers. - """ - result_string = '' - for container_obj in self.GetContainersList(only_running=only_running): - image_id = container_obj.image_id - if self.docker_version == 2: - image_id = image_id.split(':')[1] - - if container_obj.config_labels: - labels_list = ['{0:s}: {1:s}'.format(k, v) for (k, v) in - container_obj.config_labels.items()] - labels_str = ', '.join(labels_list) - result_string += 'Container id: {0:s} / Labels : {1:s}\n'.format( - container_obj.container_id, labels_str) - else: - result_string += 'Container id: {0:s} / No Label\n'.format( - container_obj.container_id) - result_string += '\tStart date: {0:s}\n'.format( - utils.FormatDatetime(container_obj.start_timestamp)) - result_string += '\tImage ID: {0:s}\n'.format(image_id) - result_string += '\tImage Name: {0:s}\n'.format( - container_obj.config_image_name) - - return result_string def GetLayerSize(self, container_id): """Returns the size of the layer. @@ -238,11 +147,11 @@ def GetLayerInfo(self, container_id): return layer_info return None - def _MakeExtraVolumeCommands(self, container_obj, mount_dir): + def _MakeExtraVolumeCommands(self, container_object, mount_dir): """Generates the shell command to mount external Volumes if present. Args: - container_obj (Container): the container object. + container_object (Container): the container object. mount_dir (str): the destination mount_point. Returns: @@ -252,7 +161,7 @@ def _MakeExtraVolumeCommands(self, container_obj, mount_dir): extra_commands = [] if self.docker_version == 1: # 'Volumes' - container_volumes = container_obj.volumes + container_volumes = container_object.volumes if container_volumes: for mountpoint, storage in container_volumes.items(): mountpoint_ihp = mountpoint.lstrip(os.path.sep) @@ -263,7 +172,7 @@ def _MakeExtraVolumeCommands(self, container_obj, mount_dir): storage_path, volume_mountpoint)) elif self.docker_version == 2: # 'MountPoints' - container_mount_points = container_obj.mount_points + container_mount_points = container_object.mount_points if container_mount_points: for _, storage_info in container_mount_points.items(): src_mount_ihp = storage_info['Source'] @@ -280,37 +189,37 @@ def _MakeExtraVolumeCommands(self, container_obj, mount_dir): return extra_commands - def Mount(self, container_id, mount_dir): + def Mount(self, container_object, mount_dir): """Mounts the specified container's filesystem. Args: - container_id (str): the ID of the container. + container_object (Container): the container. mount_dir (str): the path to the destination mount point """ - commands = self.MakeMountCommands(container_id, mount_dir) + commands = self.MakeMountCommands(container_object, mount_dir) for c in commands: print(c) print('Do you want to mount this container Id: {0:s} on {1:s} ?\n' - '(ie: run these commands) [Y/n]').format(container_id, mount_dir) + '(ie: run these commands) [Y/n]'.format( + container_object.container_id, mount_dir)) choice = raw_input().lower() if not choice or choice == 'y' or choice == 'yes': for c in commands: # TODO(romaing) this is quite unsafe, need to properly split args subprocess.call(c, shell=True) - def GetHistory(self, container_obj, show_empty_layers=False): + def GetHistory(self, container_object, show_empty_layers=False): """Returns a string representing the modification history of a container. Args: - container_obj (Container): the container object. + container_object (Container): the container object. show_empty_layers (bool): whether to display empty layers. Returns: str: the human readable history. """ - # TODO(romaing): Find a container_id from only the first few characters. history_str = '' - for layer in self.GetOrderedLayers(container_obj): + for layer in self.GetOrderedLayers(container_object): layer_info = self.GetLayerInfo(layer) if layer is None: raise ValueError('Layer {0:s} does not exist'.format(layer)) diff --git a/tests.py b/tests.py index 9199ff7..882f3ee 100644 --- a/tests.py +++ b/tests.py @@ -57,7 +57,7 @@ class TestDEMain(unittest.TestCase): def testParseArguments(self): """Tests the DockerExplorer.ParseArguments function.""" - de_test_object = de.DockerExplorer() + de_object = de.DockerExplorer() prog = sys.argv[0] @@ -66,69 +66,74 @@ def testParseArguments(self): args = [prog, '-r', expected_docker_root, 'list', 'repositories'] sys.argv = args - options = de_test_object.ParseArguments() - usage_string = de_test_object._argument_parser.format_usage() + options = de_object.ParseArguments() + usage_string = de_object._argument_parser.format_usage() expected_usage = '[-h] [-r DOCKER_DIRECTORY] {mount,list,history} ...\n' self.assertTrue(expected_usage in usage_string) - de_test_object.ParseOptions(options) + de_object.ParseOptions(options) self.assertEqual(expected_docker_root, options.docker_directory) + def testDetectStorageFail(self): + """Tests that the DockerExplorer.DetectStorage function fails on + non-existing Docker directory.""" + de_object = de.DockerExplorer() + de_object.docker_directory = 'this_dir_shouldnt_exist' -class TestAufsStorage(unittest.TestCase): - """Tests methods in the Storage object.""" + expected_error_message = ( + 'this_dir_shouldnt_exist is not a Docker directory\n' + 'Please specify the Docker\'s directory path.\n' + 'hint: de.py -r /var/lib/docker') + + with self.assertRaises(errors.BadStorageException) as err: + de_object._SetDockerDirectory('this_dir_shouldnt_exist') + self.assertEqual(expected_error_message, err.exception.message) + + +class StorageTestCase(unittest.TestCase): + """Base class for tests of different Storage implementations.""" @classmethod - def setUpClass(cls): + def tearDownClass(cls): + shutil.rmtree(os.path.join('test_data', 'docker')) + + @classmethod + def _setup(cls, driver, driver_class): + """Internal method to set up the TestCase on a specific storate.""" + cls.driver = driver docker_directory_path = os.path.join('test_data', 'docker') if not os.path.isdir(docker_directory_path): - docker_tar = os.path.join('test_data', 'aufs.tgz') + docker_tar = os.path.join('test_data', cls.driver+'.tgz') tar = tarfile.open(docker_tar, 'r:gz') tar.extractall('test_data') tar.close() + cls.de_object = de.DockerExplorer() + cls.de_object._SetDockerDirectory(docker_directory_path) - de_test_object = de.DockerExplorer() - de_test_object.docker_directory = docker_directory_path - de_test_object.DetectStorage() - cls.storage = de_test_object.storage_object - cls.container_id = ( - '7b02fb3e8a665a63e32b909af5babb7d6ba0b64e10003b2d9534c7d5f2af8966') - cls.container = cls.storage.GetContainer(cls.container_id) - cls.image_id = ( - '7968321274dc6b6171697c33df7815310468e694ac5be0ec03ff053bb135e768') - - @classmethod - def tearDownClass(cls): - shutil.rmtree(os.path.join('test_data', 'docker')) + cls.driver_class = driver_class def testDetectStorage(self): - """Tests the DockerExplorer.DetectStorage function in a AUFS storage.""" - de_test_object = de.DockerExplorer() - de_test_object.docker_directory = 'this_dir_shouldnt_exist' - - expected_error_message = ( - 'this_dir_shouldnt_exist is not a Docker directory\n' - 'Please specify the Docker\'s directory path.\n' - 'hint: de.py -r /var/lib/docker') - - with self.assertRaises(errors.BadStorageException) as err: - de_test_object.DetectStorage() - self.assertEqual(expected_error_message, err.exception.message) - - de_test_object.docker_directory = os.path.join('test_data', 'docker') - de_test_object.DetectStorage() - storage_object = de_test_object.storage_object + """Tests the DockerExplorer.DetectStorage function.""" + storage_object = self.de_object.storage_object self.assertIsNotNone(storage_object) - self.assertIsInstance(storage_object, aufs.AufsStorage) - self.assertEqual(storage_object.STORAGE_METHOD, 'aufs') + self.assertIsInstance(storage_object, self.driver_class) + self.assertEqual(storage_object.STORAGE_METHOD, self.driver) self.assertEqual(2, storage_object.docker_version) self.assertEqual('config.v2.json', - storage_object.container_config_filename) + self.de_object.container_config_filename) + + +class TestAufsStorage(StorageTestCase): + """Tests methods in the Storage object.""" + + @classmethod + def setUpClass(cls): + cls._setup('aufs', aufs.AufsStorage) def testGetAllContainers(self): """Tests the GetAllContainers function on a AuFS storage.""" - containers_list = self.storage.GetAllContainers() + containers_list = self.de_object.GetAllContainers() containers_list = sorted(containers_list, key=lambda ci: ci.name) self.assertEqual(7, len(containers_list)) @@ -140,18 +145,25 @@ def testGetAllContainers(self): self.assertEqual('busybox', container_obj.config_image_name) self.assertTrue(container_obj.running) - container_id = container_obj.container_id - self.assertTrue(self.container_id, container_id) + expected_container_id = ( + '7b02fb3e8a665a63e32b909af5babb7d6ba0b64e10003b2d9534c7d5f2af8966') + self.assertEqual(expected_container_id, container_obj.container_id) def testGetOrderedLayers(self): """Tests the Storage.GetOrderedLayers function on a AUFS storage.""" - layers = self.storage.GetOrderedLayers(self.container) + container_id = ( + '7b02fb3e8a665a63e32b909af5babb7d6ba0b64e10003b2d9534c7d5f2af8966') + container_obj = self.de_object.GetContainer(container_id) + layers = self.de_object.storage_object.GetOrderedLayers(container_obj) self.assertEqual(1, len(layers)) - self.assertEqual('sha256:{0:s}'.format(self.image_id), layers[0]) + self.assertEqual( + 'sha256:' + '7968321274dc6b6171697c33df7815310468e694ac5be0ec03ff053bb135e768', + layers[0]) def testGetRunningContainersList(self): """Tests the Storage.GetContainersList function on a AUFS storage.""" - running_containers = self.storage.GetContainersList(only_running=True) + running_containers = self.de_object.GetContainersList(only_running=True) running_containers = sorted( running_containers, key=lambda ci: ci.container_id) self.assertEqual(1, len(running_containers)) @@ -162,9 +174,9 @@ def testGetRunningContainersList(self): self.assertEqual('busybox', container.config_image_name) self.assertTrue(container.running) - def testShowContainers(self): - """Tests the Storage.ShowContainers function on a AUFS storage.""" - result_string = self.storage.ShowContainers(only_running=True) + def testGetContainersString(self): + """Tests the GetContainersString function on a AUFS storage.""" + result_string = self.de_object.GetContainersString(only_running=True) expected_string = ( 'Container id: ' '7b02fb3e8a665a63e32b909af5babb7d6ba0b64e10003b2d9534c7d5f2af8966 ' @@ -177,15 +189,16 @@ def testShowContainers(self): def testGetLayerInfo(self): """Tests the Storage.GetLayerInfo function on a AUFS storage.""" - layer_info = self.storage.GetLayerInfo( - 'sha256:{0:s}'.format(self.image_id)) + layer_info = self.de_object.storage_object.GetLayerInfo( + 'sha256:' + '7968321274dc6b6171697c33df7815310468e694ac5be0ec03ff053bb135e768') self.assertEqual('2017-01-13T22:13:54.401355854Z', layer_info['created']) self.assertEqual(['/bin/sh', '-c', '#(nop) ', 'CMD ["sh"]'], layer_info['container_config']['Cmd']) def testShowRepositories(self): """Tests the Storage.ShowRepositories function on a AUFS storage.""" - result_string = self.storage.ShowRepositories() + result_string = self.de_object.storage_object.ShowRepositories() expected_string = ( 'Listing repositories from file ' 'test_data/docker/image/aufs/repositories.json{\n' @@ -201,7 +214,11 @@ def testShowRepositories(self): def testMakeMountCommands(self): """Tests the Storage.MakeMountCommands function on a AUFS storage.""" - commands = self.storage.MakeMountCommands(self.container_id, '/mnt') + container_id = ( + '7b02fb3e8a665a63e32b909af5babb7d6ba0b64e10003b2d9534c7d5f2af8966') + container_obj = self.de_object.GetContainer(container_id) + commands = self.de_object.storage_object.MakeMountCommands( + container_obj, '/mnt') expected_commands = [ ('mount -t aufs -o ro,br=test_data/docker/aufs/diff/test_data/docker/' 'aufs/diff/' @@ -222,69 +239,30 @@ def testMakeMountCommands(self): def testGetHistory(self): """Tests the Storage.GetHistory function on a AUFS storage.""" self.maxDiff = None + container_id = ( + '7b02fb3e8a665a63e32b909af5babb7d6ba0b64e10003b2d9534c7d5f2af8966') + container_obj = self.de_object.GetContainer(container_id) expected_string = ( '-------------------------------------------------------\n' 'sha256:' '7968321274dc6b6171697c33df7815310468e694ac5be0ec03ff053bb135e768\n' '\tsize : 0\tcreated at : 2017-01-13T22:13:54.401355\t' 'with command : /bin/sh -c #(nop) CMD ["sh"]') - self.assertEqual(expected_string, self.storage.GetHistory(self.container)) + self.assertEqual( + expected_string, + self.de_object.storage_object.GetHistory(container_obj)) -class TestOverlayStorage(unittest.TestCase): +class TestOverlayStorage(StorageTestCase): """Tests methods in the OverlayStorage object.""" @classmethod def setUpClass(cls): - docker_directory_path = os.path.join('test_data', 'docker') - if not os.path.isdir(docker_directory_path): - docker_tar = os.path.join('test_data', 'overlay.tgz') - tar = tarfile.open(docker_tar, 'r:gz') - tar.extractall('test_data') - tar.close() - - de_test_object = de.DockerExplorer() - de_test_object.docker_directory = docker_directory_path - de_test_object.DetectStorage() - cls.storage = de_test_object.storage_object - cls.container_id = ( - '5dc287aa80b460652a5584e80a5c8c1233b0c0691972d75424cf5250b917600a') - cls.container = cls.storage.GetContainer(cls.container_id) - cls.image_id = ( - '5b0d59026729b68570d99bc4f3f7c31a2e4f2a5736435641565d93e7c25bd2c3') - - @classmethod - def tearDownClass(cls): - shutil.rmtree(os.path.join('test_data', 'docker')) - - def testDetectStorage(self): - """Tests the DockerExplorer.DetectStorage function on a Overlay storage.""" - de_test_object = de.DockerExplorer() - - de_test_object.docker_directory = 'this_dir_shouldnt_exist' - expected_error_message = ( - 'this_dir_shouldnt_exist is not a Docker directory\n' - 'Please specify the Docker\'s directory path.\n' - 'hint: de.py -r /var/lib/docker') - - with self.assertRaises(errors.BadStorageException) as err: - de_test_object.DetectStorage() - self.assertEqual(expected_error_message, err.exception.message) - - de_test_object.docker_directory = os.path.join('test_data', 'docker') - de_test_object.DetectStorage() - storage_object = de_test_object.storage_object - self.assertIsNotNone(storage_object) - self.assertIsInstance(storage_object, overlay.OverlayStorage) - self.assertEqual(storage_object.STORAGE_METHOD, 'overlay') - - self.assertEqual(2, storage_object.docker_version) - self.assertEqual('config.v2.json', - storage_object.container_config_filename) + cls._setup('overlay', overlay.OverlayStorage) def testGetAllContainers(self): """Tests the GetAllContainers function on a Overlay storage.""" - containers_list = self.storage.GetAllContainers() + containers_list = self.de_object.GetAllContainers() containers_list = sorted(containers_list, key=lambda ci: ci.name) self.assertEqual(6, len(containers_list)) @@ -296,18 +274,25 @@ def testGetAllContainers(self): self.assertEqual('busybox:latest', container_obj.config_image_name) self.assertTrue(container_obj.running) - container_id = container_obj.container_id - self.assertTrue(self.container_id, container_id) + expected_container_id = ( + '5dc287aa80b460652a5584e80a5c8c1233b0c0691972d75424cf5250b917600a') + self.assertEqual(expected_container_id, container_obj.container_id) def testGetOrderedLayers(self): """Tests the Storage.GetOrderedLayers function on a Overlay storage.""" - layers = self.storage.GetOrderedLayers(self.container) + container_id = ( + '5dc287aa80b460652a5584e80a5c8c1233b0c0691972d75424cf5250b917600a') + container_obj = self.de_object.GetContainer(container_id) + layers = self.de_object.storage_object.GetOrderedLayers(container_obj) self.assertEqual(1, len(layers)) - self.assertEqual('sha256:{0:s}'.format(self.image_id), layers[0]) + self.assertEqual( + 'sha256:' + '5b0d59026729b68570d99bc4f3f7c31a2e4f2a5736435641565d93e7c25bd2c3', + layers[0]) def testGetRunningContainersList(self): """Tests the Storage.GetContainersList function on a Overlay storage.""" - running_containers = self.storage.GetContainersList(only_running=True) + running_containers = self.de_object.GetContainersList(only_running=True) running_containers = sorted( running_containers, key=lambda ci: ci.container_id) self.assertEqual(1, len(running_containers)) @@ -319,9 +304,9 @@ def testGetRunningContainersList(self): self.assertTrue(container.running) - def testShowContainers(self): - """Tests the Storage.ShowContainers function on a Overlay storage.""" - result_string = self.storage.ShowContainers(only_running=True) + def testGetContainersString(self): + """Tests the GetContainersString function on a Overlay storage.""" + result_string = self.de_object.GetContainersString(only_running=True) expected_string = ( 'Container id: ' '5dc287aa80b460652a5584e80a5c8c1233b0c0691972d75424cf5250b917600a ' @@ -334,15 +319,16 @@ def testShowContainers(self): def testGetLayerInfo(self): """Tests the Storage.GetLayerInfo function on a Overlay storage.""" - layer_info = self.storage.GetLayerInfo( - 'sha256:{0:s}'.format(self.image_id)) + layer_info = self.de_object.storage_object.GetLayerInfo( + 'sha256:' + '5b0d59026729b68570d99bc4f3f7c31a2e4f2a5736435641565d93e7c25bd2c3') self.assertEqual('2018-01-24T04:29:35.590938514Z', layer_info['created']) self.assertEqual(['/bin/sh', '-c', '#(nop) ', 'CMD ["sh"]'], layer_info['container_config']['Cmd']) def testShowRepositories(self): - """Tests the Storage.GetLayerInfo function on a Overlay storage.""" - result_string = self.storage.ShowRepositories() + """Tests the Storage.ShowRepositories function on a Overlay storage.""" + result_string = self.de_object.storage_object.ShowRepositories() self.maxDiff = None expected_string = ( 'Listing repositories from file ' @@ -362,7 +348,11 @@ def testShowRepositories(self): def testMakeMountCommands(self): """Tests the Storage.MakeMountCommands function on a Overlay storage.""" - commands = self.storage.MakeMountCommands(self.container_id, '/mnt') + container_id = ( + '5dc287aa80b460652a5584e80a5c8c1233b0c0691972d75424cf5250b917600a') + container_obj = self.de_object.GetContainer(container_id) + commands = self.de_object.storage_object.MakeMountCommands( + container_obj, '/mnt') expected_commands = [( 'mount -t overlay overlay -o ro,lowerdir=' '"test_data/docker/overlay/a94d714512251b0d8a9bfaacb832e0c6cb70f71cb71' @@ -376,6 +366,9 @@ def testMakeMountCommands(self): def testGetHistory(self): """Tests the Storage.GetHistory function on a Overlay storage.""" self.maxDiff = None + container_id = ( + '5dc287aa80b460652a5584e80a5c8c1233b0c0691972d75424cf5250b917600a') + container_obj = self.de_object.GetContainer(container_id) expected_string = ( '-------------------------------------------------------\n' 'sha256:' @@ -383,63 +376,20 @@ def testGetHistory(self): '\tsize : 0\tcreated at : 2018-01-24T04:29:35.590938\t' 'with command : /bin/sh -c #(nop) CMD ["sh"]') self.assertEqual( - expected_string, self.storage.GetHistory(self.container)) + expected_string, + self.de_object.storage_object.GetHistory(container_obj)) -class TestOverlay2Storage(unittest.TestCase): +class TestOverlay2Storage(StorageTestCase): """Tests methods in the Overlay2Storage object.""" @classmethod def setUpClass(cls): - docker_directory_path = os.path.join('test_data', 'docker') - if not os.path.isdir(docker_directory_path): - docker_tar = os.path.join('test_data', 'overlay2.tgz') - tar = tarfile.open(docker_tar, 'r:gz') - tar.extractall('test_data') - tar.close() - - de_test_object = de.DockerExplorer() - de_test_object.docker_directory = docker_directory_path - de_test_object.DetectStorage() - cls.storage = de_test_object.storage_object - cls.container_id = ( - '8e8b7f23eb7cbd4dfe7e91646ddd0e0f524218e25d50113559f078dfb2690206') - cls.container = cls.storage.GetContainer(cls.container_id) - cls.image_id = ( - '8ac48589692a53a9b8c2d1ceaa6b402665aa7fe667ba51ccc03002300856d8c7') - - @classmethod - def tearDownClass(cls): - shutil.rmtree(os.path.join('test_data', 'docker')) - - def testDetectStorage(self): - """Tests the DockerExplorer.DetectStorage function on a Overlay2 storage.""" - de_test_object = de.DockerExplorer() - de_test_object.docker_directory = 'this_dir_shouldnt_exist' - - expected_error_message = ( - 'this_dir_shouldnt_exist is not a Docker directory\n' - 'Please specify the Docker\'s directory path.\n' - 'hint: de.py -r /var/lib/docker') - - with self.assertRaises(errors.BadStorageException) as err: - de_test_object.DetectStorage() - self.assertEqual(expected_error_message, err.exception.message) - - de_test_object.docker_directory = os.path.join('test_data', 'docker') - de_test_object.DetectStorage() - storage_object = de_test_object.storage_object - self.assertIsNotNone(storage_object) - self.assertIsInstance(storage_object, overlay.OverlayStorage) - self.assertEqual(storage_object.STORAGE_METHOD, 'overlay2') - - self.assertEqual(2, storage_object.docker_version) - self.assertEqual('config.v2.json', - storage_object.container_config_filename) + cls._setup('overlay2', overlay.Overlay2Storage) def testGetAllContainers(self): """Tests the GetAllContainers function on a Overlay2 storage.""" - containers_list = self.storage.GetAllContainers() + containers_list = self.de_object.GetAllContainers() containers_list = sorted(containers_list, key=lambda ci: ci.name) self.assertEqual(5, len(containers_list)) @@ -450,18 +400,26 @@ def testGetAllContainers(self): container_obj.creation_timestamp) self.assertEqual('busybox', container_obj.config_image_name) self.assertTrue(container_obj.running) - container_id = container_obj.container_id - self.assertTrue(self.container_id, container_id) + + expected_container_id = ( + '8e8b7f23eb7cbd4dfe7e91646ddd0e0f524218e25d50113559f078dfb2690206') + self.assertEqual(expected_container_id, container_obj.container_id) def testGetOrderedLayers(self): """Tests the Storage.GetOrderedLayers function on a Overlay2 storage.""" - layers = self.storage.GetOrderedLayers(self.container) + container_id = ( + '8e8b7f23eb7cbd4dfe7e91646ddd0e0f524218e25d50113559f078dfb2690206') + container_obj = self.de_object.GetContainer(container_id) + layers = self.de_object.storage_object.GetOrderedLayers(container_obj) self.assertEqual(1, len(layers)) - self.assertEqual('sha256:{0:s}'.format(self.image_id), layers[0]) + self.assertEqual( + 'sha256:' + '8ac48589692a53a9b8c2d1ceaa6b402665aa7fe667ba51ccc03002300856d8c7', + layers[0]) def testGetRunningContainersList(self): """Tests the Storage.GetContainersList function on a Overlay2 storage.""" - running_containers = self.storage.GetContainersList(only_running=True) + running_containers = self.de_object.GetContainersList(only_running=True) running_containers = sorted( running_containers, key=lambda ci: ci.container_id) self.assertEqual(1, len(running_containers)) @@ -473,9 +431,9 @@ def testGetRunningContainersList(self): self.assertTrue(container.running) - def testShowContainers(self): - """Tests the Storage.ShowContainers function on a Overlay2 storage.""" - result_string = self.storage.ShowContainers(only_running=True) + def testGetContainersString(self): + """Tests the GetContainersString function on a Overlay2 storage.""" + result_string = self.de_object.GetContainersString(only_running=True) expected_string = ( 'Container id: ' '8e8b7f23eb7cbd4dfe7e91646ddd0e0f524218e25d50113559f078dfb2690206 ' @@ -488,15 +446,16 @@ def testShowContainers(self): def testGetLayerInfo(self): """Tests the Storage.GetLayerInfo function on a Overlay2 storage.""" - layer_info = self.storage.GetLayerInfo( - 'sha256:{0:s}'.format(self.image_id)) + layer_info = self.de_object.storage_object.GetLayerInfo( + 'sha256:' + '8ac48589692a53a9b8c2d1ceaa6b402665aa7fe667ba51ccc03002300856d8c7') self.assertEqual('2018-04-05T10:41:28.876407948Z', layer_info['created']) self.assertEqual(['/bin/sh', '-c', '#(nop) ', 'CMD ["sh"]'], layer_info['container_config']['Cmd']) def testShowRepositories(self): """Tests the Storage.ShowRepositories function on a Overlay2 storage.""" - result_string = self.storage.ShowRepositories() + result_string = self.de_object.storage_object.ShowRepositories() self.maxDiff = None expected_string = ( 'Listing repositories from file ' @@ -517,7 +476,11 @@ def testShowRepositories(self): def testMakeMountCommands(self): """Tests the Storage.MakeMountCommands function on a Overlay2 storage.""" self.maxDiff = None - commands = self.storage.MakeMountCommands(self.container_id, '/mnt') + container_id = ( + '8e8b7f23eb7cbd4dfe7e91646ddd0e0f524218e25d50113559f078dfb2690206') + container_obj = self.de_object.GetContainer(container_id) + commands = self.de_object.storage_object.MakeMountCommands( + container_obj, '/mnt') expected_commands = [( 'mount -t overlay overlay -o ro,lowerdir=' '"test_data/docker/overlay2/l/OTFSLJCXWCECIG6FVNGRTWUZ7D:' @@ -533,6 +496,9 @@ def testMakeMountCommands(self): def testGetHistory(self): """Tests the Storage.GetHistory function on a Overlay2 storage.""" self.maxDiff = None + container_id = ( + '8e8b7f23eb7cbd4dfe7e91646ddd0e0f524218e25d50113559f078dfb2690206') + container_obj = self.de_object.GetContainer(container_id) expected_string = ( '-------------------------------------------------------\n' 'sha256:' @@ -540,7 +506,10 @@ def testGetHistory(self): '\tsize : 0\tcreated at : 2018-04-05T10:41:28.876407\t' 'with command : /bin/sh -c #(nop) CMD ["sh"]') self.assertEqual( - expected_string, self.storage.GetHistory(self.container)) + expected_string, + self.de_object.storage_object.GetHistory(container_obj)) + +del StorageTestCase if __name__ == '__main__': unittest.main()