-
Notifications
You must be signed in to change notification settings - Fork 116
/
get_rtr_result.py
154 lines (140 loc) · 6.24 KB
/
get_rtr_result.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
r"""Retrieve the results of a command executed via Real Time Response.
_______ __ _______ __
| _ .-----.---.-| || |__.--------.-----.
|. l | -__| _ | ||.| | | | | -__|
|. _ |_____|___._|__|`-|. |-|__|__|__|__|_____|
|: | | |: |
|::.|:. | |::.|
`--- ---' `---'
_______
| _ .-----.-----.-----.-----.-----.-----.-----.
|. l | -__|__ --| _ | _ | |__ --| -__|
|. _ |_____|_____| __|_____|__|__|_____|_____|
|: | | |__|
|::.|:. | FalconPy v1.1
`--- ---'
"""
import os
import glob
import json
from argparse import ArgumentParser, RawTextHelpFormatter
from falconpy import RealTimeResponseAdmin, __version__ as FPVERSION
def show_banner():
"""Display the banner."""
top = __doc__.split("\n")
top.pop(0)
top.pop(0)
top.insert(0, "\033[31m")
top = "\n".join(top).replace("FalconPy v1.1", f"\033[34m FalconPy v{FPVERSION} \033[31m")
mag = "\033[95m"
endm = "\033[0m"
print(rf"""{top} {mag}
____ _ _ ____ ____ _ _ ___ _ ____ _ _ ____ ____ ____ _ _ _ ___ ____
|___ \/ |___ | | | | | | | |\ | |__/ |___ [__ | | | | [__
|___ _/\_ |___ |___ |__| | | |__| | \| | \ |___ ___] |__| |___ | ___]
{'=' * 78}{endm}""")
def parse_command_line():
"""Retrieve provided command line arguments."""
parser = ArgumentParser(description=__doc__, formatter_class=RawTextHelpFormatter)
parser.add_argument("-k", "--falcon_client_id",
help="CrowdStrike Falcon API Client ID",
required=True
)
parser.add_argument("-s", "--falcon_client_secret",
help="CrowdStrike Falcon API Client Secret",
required=True
)
parser.add_argument("-m", "--member_cid",
help="Child CID for MSSP scenarios",
required=False,
default=None
)
parser.add_argument("-b", "--base_url",
help="CrowdStrike Base URL (Only required for GovCloud: usgov1)",
required=False,
default="auto"
)
parser.add_argument("-c", "--cloud_request_id",
help="Cloud Request ID to retrieve, accepts comma-delimited lists",
required=False,
default=None
)
parser.add_argument("-q", "--sequence",
help="Command result sequence ID, defaults to 0",
required=False,
default=0
)
parser.add_argument("-f", "--queue_file_folder",
help="Load a directory of save files or a single save file for processing",
required=False,
default=None
)
return parser.parse_args()
def perform_lookup(clid, csec, mid, base, cr_id, seq): # pylint: disable=R0913
"""Perform the lookup for the cloud request ID."""
# Show the banner
show_banner()
# Connect to the API using the Real Time Response Admin Service Class
rtr = RealTimeResponseAdmin(client_id=clid,
client_secret=csec,
member_cid=mid,
base_url=base
)
# Loop through each Cloud Request ID provided and check it's status
for request_id in cr_id.split(","):
result = rtr.check_admin_command_status(cloud_request_id=request_id,
sequence=seq
)
# Display the Cloud Request ID
print(f"Cloud Request ID:\033[1m {request_id}\033[0m")
# Display the API error result
if result["status_code"] != 200:
for err in result["body"]["errors"]:
print(f"[{err['code']}] {err['message']}\n")
# Display the contents of `stdout` and `stderr` for the returned result
for execution in result["body"]["resources"]:
for std in ["stdout", "stderr"]:
if std in execution:
if execution[std]:
print(f"{execution[std]}\n")
# Parse the command line
args = parse_command_line()
# If we were provided an execution directory
if args.queue_file_folder:
# And that directory exists
if os.path.exists(args.queue_file_folder):
# Check to see if we're dealing with a file or folder
if os.path.isfile(args.queue_file_folder):
# We're only handling one file
queue_files = [args.queue_file_folder]
else:
# Retrieve a list of all queue files within this directory
queue_files = glob.glob(os.path.join(args.queue_file_folder,"*.json"), recursive=True)
for queue_file in queue_files:
# Loop through each queue file in the folder and
# perform a lookup for each cloud request ID within
with open(queue_file, "r", encoding="utf-8") as queues:
queued = json.load(queues)
cloud_request_id = []
for mcid, details in queued.items():
mem_cid = mcid
for aid, crid in details.items():
cloud_request_id.append(crid)
CLOUD_REQUEST_ID = ",".join(cloud_request_id)
# Perform a lookup based upon the contents of the queue file
perform_lookup(args.falcon_client_id,
args.falcon_client_secret,
mem_cid,
args.base_url,
CLOUD_REQUEST_ID,
args.sequence
)
else:
# Perform a singular lookup based upon the cloud request ID provided
perform_lookup(args.falcon_client_id,
args.falcon_client_secret,
args.member_cid,
args.base_url,
args.cloud_request_id,
args.sequence
)