-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathbfs.py
106 lines (96 loc) · 4.35 KB
/
bfs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import requests
import argparse
import sys
import json
import subprocess
dangerous_permissions = [
"dataproc.clusters.create",
"composer.environments.create",
"dataflow.jobs.create",
"iam.serviceAccounts.setIamPolicy",
"dataprep.projects.use",
"deploymentmanager.deployments.create",
"datafusion.instances.create",
"iam.serviceAccounts.actAs",
"iam.serviceAccounts.getAccessToken",
"iam.serviceAccounts.getOpenIdToken",
"iam.serviceAccounts.signBlob",
"iam.serviceAccounts.signJwt",
"iam.serviceAccounts.implicitDelegation",
"resourcemanager.projects.setIamPolicy",
"resourcemanager.folders.setIamPolicy",
"resourcemanager.organizations.setIamPolicy",
"resourcemanager.folders.setIamPolicy",
"resourcemanager.projects.setIamPolicy",
]
def bfs_search(org, base_id):
token = subprocess.check_output("gcloud auth print-access-token".split(" ")).decode("utf-8")
token = token.strip()
headers = {
"Authorization": "Bearer {}".format(token),
"content-type": "application/json",
"x-http-method-override": "GET"
}
visited = []
visited_projects = []
info = {}
queue = [base_id]
while queue:
service_account = queue.pop()
visited.append(service_account)
JSON_REQUEST={
"analysisQuery": {
"parent": org,
"identitySelector": {
"identity": "serviceAccount:{}".format(service_account)
},
"accessSelector": {
"permissions": []
}
}
}
for dangerous_permission in dangerous_permissions:
JSON_REQUEST["analysisQuery"]["accessSelector"]["permissions"].append(dangerous_permission)
res = requests.post("https://cloudasset.googleapis.com/v1p4beta1/organizations/{}:analyzeIamPolicy".format(org), headers=headers, json=JSON_REQUEST)
results = res.json()
if "analysisResults" in results["mainAnalysis"]:
for result in results["mainAnalysis"]["analysisResults"]:
recipient = result["attachedResourceFullName"]
target = recipient.split("/")[-1]
if recipient.startswith("//cloudresourcemanager.googleapis.com/projects"):
if target not in visited_projects:
visited_projects.append(target)
command = "gcloud iam service-accounts list --format json --project {}".format(target)
project_service_accounts = json.loads(subprocess.check_output(command.split(" ")).decode("utf-8"))
for project_service_account in project_service_accounts:
sa_email = project_service_account["email"]
if sa_email not in visited and sa_email not in queue:
queue.append(sa_email)
info[sa_email] = target
print("Adding {} by means of {} with {}".format(sa_email, service_account, result["iamBinding"]["role"]))
elif recipient.startswith("//iam.googleapis.com/projects/"):
if target not in visited and target not in queue:
queue.append(target)
info[target] = recipient.split("/")[-3]
print("Adding {} by means of {} with {}".format(target, service_account, result["iamBinding"]["role"]))
return visited, info
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='bfs tool for GCP service account exploitation.')
parser.add_argument('--source', dest="source",
help='The starting point for your bfs search')
parser.add_argument('--org_id', dest="org",
help='Your org ID')
args = parser.parse_args()
if not args.source:
print("Need the starting point --source <serviceAccountEmail>")
sys.exit()
elif not args.org:
print("Need the org ID --org_id <org_id>")
sys.exit()
base_id = args.source
org_id = args.org
visited, info = bfs_search(org_id, base_id)
print("\n\n~~~~~~~{} can move laterally to the following identities ~~~~~~~~~~~".format(base_id))
for service_account in visited:
if service_account != base_id:
print("{} from project {}".format(service_account, info[service_account]))