diff --git a/maltego_trx/decorator_registry.py b/maltego_trx/decorator_registry.py index 45eda28..2a564da 100644 --- a/maltego_trx/decorator_registry.py +++ b/maltego_trx/decorator_registry.py @@ -22,11 +22,18 @@ serialize_xml, ) -TRANSFORMS_CSV_HEADER = ( +LEGACY_TRANSFORMS_CSV_HEADER = ( "Owner,Author,Disclaimer,Description,Version," "Name,UIName,URL,entityName," "oAuthSettingId,transformSettingIDs,seedIDs" ) + +TRANSFORMS_CSV_HEADER = ( + "Owner,Author,Disclaimer,Description,Version," + "Name,UIName,URL,entityName," + "oAuthSettingId,transformSettingIDs,seedIDs,outputEntities" +) + SETTINGS_CSV_HEADER = "Name,Type,Display,DefaultValue,Optional,Popup" @@ -127,7 +134,7 @@ def decorated(transform_callable: object): return decorated - def _create_transforms_config(self) -> Iterable[str]: + def _create_transforms_config(self, include_output_entities: bool = True) -> Iterable[str]: global_settings_full_names = [gs.id for gs in self.global_settings] for transform_name, transform_meta in self.transform_metas.items(): @@ -149,22 +156,30 @@ def _create_transforms_config(self) -> Iterable[str]: ";".join(self.oauth_settings_id), # combine global and transform scoped settings ";".join(chain(meta_settings, global_settings_full_names)), - ";".join(self.seed_ids), + ";".join(self.seed_ids) ] + if include_output_entities: + transform_row.append(";".join(transform_meta.output_entities)) + escaped_fields = escape_csv_fields(*transform_row) yield ",".join(escaped_fields) def write_transforms_config( - self, config_path: str = "./transforms.csv", csv_line_limit: int = 100 + self, config_path: str = "./transforms.csv", csv_line_limit: int = 100, include_output_entities: bool = True ): """Exports the collected transform metadata as a csv-file to config_path""" - csv_lines = self._create_transforms_config() + csv_lines = self._create_transforms_config(include_output_entities=include_output_entities) - export_as_csv( - TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit - ) + if not include_output_entities: + export_as_csv( + LEGACY_TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit + ) + else: + export_as_csv( + TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit + ) def _create_settings_config(self) -> Iterable[str]: chained_settings = chain( diff --git a/maltego_trx/template_dir/project.py b/maltego_trx/template_dir/project.py index 1d0e096..1d3cb59 100644 --- a/maltego_trx/template_dir/project.py +++ b/maltego_trx/template_dir/project.py @@ -8,7 +8,7 @@ register_transform_classes(transforms) -registry.write_transforms_config() +registry.write_transforms_config(include_output_entities=True) registry.write_settings_config() if __name__ == '__main__': diff --git a/tests/test_decorator_registry.py b/tests/test_decorator_registry.py index e562ec6..af75e61 100644 --- a/tests/test_decorator_registry.py +++ b/tests/test_decorator_registry.py @@ -13,6 +13,7 @@ TransformSetting, TransformRegistry, TRANSFORMS_CSV_HEADER, + LEGACY_TRANSFORMS_CSV_HEADER, SETTINGS_CSV_HEADER, TransformSet, ) @@ -120,7 +121,21 @@ class TransformCsvLine(NamedTuple): oauth_id: str settings_ids: str seed_ids: str + output_entities: str +class LegacyTransformCsvLine(NamedTuple): + owner: str + author: str + disclaimer: str + description: str + version: str + name: str + display_name: str + host: str + input_entity: str + oauth_id: str + settings_ids: str + seed_ids: str class SettingCsvLine(NamedTuple): name: str @@ -139,9 +154,10 @@ def test_transform_to_csv(registry: TransformRegistry): tx_meta = registry.transform_metas.get(path_name) tx_settings = registry.transform_settings.get(path_name, []) - registry.write_transforms_config() + output_path = "./transforms_with_output_entities.csv" + registry.write_transforms_config(config_path=output_path,include_output_entities=True) - with open("./transforms.csv") as transforms_csv: + with open(output_path) as transforms_csv: header = next(transforms_csv) assert header.rstrip("\n") == TRANSFORMS_CSV_HEADER @@ -160,7 +176,38 @@ def test_transform_to_csv(registry: TransformRegistry): assert data.oauth_id == registry.oauth_settings_id assert data.settings_ids.split(";") == [s.id for s in tx_settings] assert data.seed_ids.split(";") == registry.seed_ids + assert data.output_entities.split(";") == tx_meta.output_entities + +def test_transform_to_legacy_csv(registry: TransformRegistry): + random_class = make_transform(registry) + + path_name = name_to_path(random_class.__name__) + + tx_meta = registry.transform_metas.get(path_name) + tx_settings = registry.transform_settings.get(path_name, []) + + output_path = "./transforms_no_output_entities.csv" + registry.write_transforms_config(config_path=output_path,include_output_entities=False) + with open(output_path) as transforms_csv: + header = next(transforms_csv) + assert header.rstrip("\n") == LEGACY_TRANSFORMS_CSV_HEADER + + line = next(transforms_csv).rstrip("\n") + data: LegacyTransformCsvLine = LegacyTransformCsvLine(*line.split(",")) + + assert data.owner == registry.owner + assert data.author == registry.author + assert data.disclaimer == tx_meta.disclaimer + assert data.description == tx_meta.description + assert data.version == registry.version + assert data.name == tx_meta.class_name + assert data.display_name == tx_meta.display_name + assert data.host == os.path.join(registry.host_url, "run", path_name) + assert data.input_entity == tx_meta.input_entity + assert data.oauth_id == registry.oauth_settings_id + assert data.settings_ids.split(";") == [s.id for s in tx_settings] + assert data.seed_ids.split(";") == registry.seed_ids def test_setting_to_csv(registry: TransformRegistry): local_setting = make_transform_setting(global_setting=False)