Skip to content

Commit

Permalink
pr validations
Browse files Browse the repository at this point in the history
  • Loading branch information
lanasalameh1 committed Jul 24, 2023
1 parent 8c49084 commit aee9b80
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 83 deletions.
38 changes: 36 additions & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
[flake8]
ignore = E203, E266, E501, W503
ignore =
# Black formatter prefers whitespaces that this flake8 rule rejects
E203,
# Black formatter is responsible for lines length
E501,
# Line breaks before binary operator became best practice: https://www.flake8rules.com/rules/W503.html
W503,
# Too many leading '#' for block comment: sometimes we command out code that has comments, so we want to allow multiple comment markers
E266,
# We have complex functions we wish to keep as is for now
C901,
max-line-length = 150
max-complexity = 18
select = B,C,E,F,W,T4
select = B,C,E,F,W,T4 # Good recommended defaults
extend-select = # Plugins
# builtins plugin: https://github.com/gforcada/flake8-builtins
A0,
# comprehensions plugin: https://github.com/adamchainz/flake8-comprehensions
C4,
# mutable plugin: https://github.com/ebeweber/flake8-mutable/blob/master/mutable_defaults.py
M511,
# naming plugin: https://github.com/PyCQA/pep8-naming
N8,
# pytest-style plugin: https://github.com/m-burst/flake8-pytest-style
PT,
# simplify plugin: https://github.com/MartinThoma/flake8-simplify
SIM
exclude =
# No need to traverse our git directory
.git,
# No need to scan Github files
.github,
# There's no value in checking cache directories
__pycache__,
# No need to scan virtual environment files
.venv,
# No need to scan MyPy files
.mypy_cache
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,4 @@


class ExflirtateFileCommandArgs(CommandArguments[ExflirtateFileArgsProperties]):
command_to_run: Literal[
CommandToRunEnum.EXFILTRATION
] = CommandToRunEnum.EXFILTRATION
command_to_run: Literal[CommandToRunEnum.EXFILTRATION] = CommandToRunEnum.EXFILTRATION
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@


class StealCookieArgsProperties(CommandArgsPropertiesBaseModel):
steal_cookie_fqdn: str = Field(
help="fully qualified domain name to fetch the cookies of"
)
steal_cookie_fqdn: str = Field(help="fully qualified domain name to fetch the cookies of")
4 changes: 1 addition & 3 deletions src/powerpwn/machinepwn/models/steal_cookie_command_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,4 @@


class StealCookieCommandArgs(CommandArguments[StealCookieArgsProperties]):
command_to_run: Literal[
CommandToRunEnum.STEAL_COOKIE
] = CommandToRunEnum.STEAL_COOKIE
command_to_run: Literal[CommandToRunEnum.STEAL_COOKIE] = CommandToRunEnum.STEAL_COOKIE
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,5 @@
from powerpwn.machinepwn.models.command_args_properties_base_model import CommandArgsPropertiesBaseModel


class StealPowerAutomateTokenCommandArgs(
CommandArguments[CommandArgsPropertiesBaseModel]
):
command_to_run: Literal[
CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN
] = CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN
class StealPowerAutomateTokenCommandArgs(CommandArguments[CommandArgsPropertiesBaseModel]):
command_to_run: Literal[CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN] = CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ def _ping(self, connection_parameters: Dict[str, Any]) -> List[DataStore]:
records: List[DataStore] = []

sources_success, sources_val = consecutive_gets(
session=self._session,
expected_status_prefix="200",
url=f"{self._root}/codeless/v1.0/sources",
session=self._session, expected_status_prefix="200", url=f"{self._root}/codeless/v1.0/sources"
)

if sources_success:
Expand All @@ -26,10 +24,7 @@ def _ping(self, connection_parameters: Dict[str, Any]) -> List[DataStore]:

params = {"source": source_id}
drives_success, drives_val = consecutive_gets(
session=self._session,
expected_status_prefix="200",
url=f"{self._root}/codeless/v1.0/drives",
params=params,
session=self._session, expected_status_prefix="200", url=f"{self._root}/codeless/v1.0/drives", params=params
)

if drives_success:
Expand All @@ -47,9 +42,7 @@ def _ping(self, connection_parameters: Dict[str, Any]) -> List[DataStore]:

return records

def __enum_dir(
self, source_id: str, drive_id: str, folder_id: str
) -> List[Dict[str, Any]]:
def __enum_dir(self, source_id: str, drive_id: str, folder_id: str) -> List[Dict[str, Any]]:
file_objs: List[Dict[str, Any]] = []

if folder_id == "root":
Expand All @@ -67,21 +60,13 @@ def __enum_dir(
)
if can_list_folder:
if not isinstance(list_folder_val, list):
raise ValueError(
f"Unexpected response list_folder_val: {list_folder_val}."
)
raise ValueError(f"Unexpected response list_folder_val: {list_folder_val}.")

for file_or_dir_obj in list_folder_val:
if not isinstance(file_or_dir_obj, dict):
raise ValueError(
f"Unexpected response file_or_dir_obj: {file_or_dir_obj}."
)
raise ValueError(f"Unexpected response file_or_dir_obj: {file_or_dir_obj}.")
if file_or_dir_obj["IsFolder"]:
file_objs += self.__enum_dir(
source_id=source_id,
drive_id=drive_id,
folder_id=file_or_dir_obj["Id"],
)
file_objs += self.__enum_dir(source_id=source_id, drive_id=drive_id, folder_id=file_or_dir_obj["Id"])
else:
file_objs += [file_or_dir_obj]

Expand All @@ -93,9 +78,7 @@ def _enum(self, data_store: DataStoreWithContext) -> List[DataRecord]:
source_id = data_store.data_store.extra["source"]["id"]
drive_id = data_store.data_store.extra["drive"]["id"]

drive_files = self.__enum_dir(
source_id=source_id, drive_id=drive_id, folder_id="root"
)
drive_files = self.__enum_dir(source_id=source_id, drive_id=drive_id, folder_id="root")

for file_obj in drive_files:
file_id = file_obj["Id"]
Expand Down Expand Up @@ -142,9 +125,7 @@ def _dump(self, data_record: DataRecordWithContext) -> DataDump:
content = json.dumps(rows_val).encode(ENCODING)

else:
raise ValueError(
f"Unsupported data type: {data_record.data_record.record_type}."
)
raise ValueError(f"Unsupported data type: {data_record.data_record.record_type}.")

if not success:
raise ValueError(
Expand Down
50 changes: 11 additions & 39 deletions tests/powerpwn_tests/machine_pwn_tests/machine_pwn_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def _run_cmd(self, arguments_as_dict: dict) -> dict: # type: ignore
cmd_res = CommandResults.construct(
is_success=True,
agent_run_type=AgentRunType.attended,
agent_run_errors=AgentRunErrors.construct(
attended_run_error={}, unattended_run_error={}
),
agent_run_errors=AgentRunErrors.construct(attended_run_error={}, unattended_run_error={}),
)

if self.command_to_run == CommandToRunEnum.CODE_EXEC:
Expand All @@ -45,71 +43,45 @@ def _run_cmd(self, arguments_as_dict: dict) -> dict: # type: ignore
elif self.command_to_run == CommandToRunEnum.CLEANUP:
cmd_res.cmd_cleanup = CleanupOutputs.construct()
elif self.command_to_run == CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN:
cmd_res.cmd_steal_power_automate_token = (
StealPowerAutomateTokenOutputs.construct()
)
cmd_res.cmd_steal_power_automate_token = StealPowerAutomateTokenOutputs.construct()
elif self.command_to_run == CommandToRunEnum.STEAL_COOKIE:
cmd_res.cmd_steal_cookie = StealCookieOutputs.construct()
else:
raise ValueError(
f"command_to_run has invalid value: {self.command_to_run}."
)
raise ValueError(f"command_to_run has invalid value: {self.command_to_run}.")

cmd_res_as_dict = json.loads(cmd_res.json())
return cmd_res_as_dict


@pytest.mark.parametrize("command_type", [cmd_type for cmd_type in CodeExecTypeEnum])
def test_code_exec(command_type: CodeExecTypeEnum, command: str = "") -> None:
c2 = DummyPowerPwnC2(
post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.CODE_EXEC
)
c2 = DummyPowerPwnC2(post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.CODE_EXEC)

c2.exec_command(command, command_type)


def test_ransomware(
crawl_depth: str = "0",
dirs_to_init_crawl: Optional[List[str]] = None,
encryption_key: str = "",
) -> None:
def test_ransomware(crawl_depth: str = "0", dirs_to_init_crawl: Optional[List[str]] = None, encryption_key: str = "") -> None:
if dirs_to_init_crawl is None:
dirs_to_init_crawl = [""]
c2 = DummyPowerPwnC2(
post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.RANSOMWARE
)
c2.ransomware(
crawl_depth=crawl_depth,
dirs_to_init_crawl=dirs_to_init_crawl,
encryption_key=encryption_key,
)
c2 = DummyPowerPwnC2(post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.RANSOMWARE)
c2.ransomware(crawl_depth=crawl_depth, dirs_to_init_crawl=dirs_to_init_crawl, encryption_key=encryption_key)


def test_exfiltration(target_file_path: str = "") -> None:
c2 = DummyPowerPwnC2(
post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.EXFILTRATION
)
c2 = DummyPowerPwnC2(post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.EXFILTRATION)
c2.exfiltrate(target_file_path=target_file_path)


def test_cleanup() -> None:
c2 = DummyPowerPwnC2(
post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.CLEANUP
)
c2 = DummyPowerPwnC2(post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.CLEANUP)
c2.cleanup()


def test_steal_power_automate_token() -> None:
c2 = DummyPowerPwnC2(
post_url=POST_URL,
debug=DEBUG,
command_to_run=CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN,
)
c2 = DummyPowerPwnC2(post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN)
c2.steal_power_automate_token()


def test_steal_cookie(fqdn: str = "") -> None:
c2 = DummyPowerPwnC2(
post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.STEAL_COOKIE
)
c2 = DummyPowerPwnC2(post_url=POST_URL, debug=DEBUG, command_to_run=CommandToRunEnum.STEAL_COOKIE)
c2.steal_cookie(fqdn=fqdn)

0 comments on commit aee9b80

Please sign in to comment.