-
Notifications
You must be signed in to change notification settings - Fork 90
/
create-consolidated-report.gmp.py
executable file
·451 lines (372 loc) · 12.6 KB
/
create-consolidated-report.gmp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# SPDX-FileCopyrightText: 2021 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
import sys
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
from datetime import date
from typing import List, Tuple
from uuid import UUID
from gvm.errors import GvmError
from gvm.protocols.gmp import Gmp
from gvmtools.helper import error_and_exit, generate_uuid
from lxml import etree as e
HELP_TEXT = (
"This script creates a consolidated report and imports it to the GSM. "
"You are able to set a time period. Within this period the last report"
"of all tasks will be consolidated. You can additionally filter the "
"tasks by one or more tags and the results with a filter id or filter "
"term.\n"
" Usable with gvm-script (gvm-tools). Help: gvm-script -h"
)
def parse_tags(tags: List[str]) -> List[str]:
"""Parsing and validating the given tags
tags (List): A list containing tags:
name, tag-id, name=value
Returns a list containing tag="name", tag_id="id" ...
"""
filter_tags = []
for tag in tags:
try:
UUID(tag, version=4)
filter_tags.append(f'tag_id="{tag}"')
except ValueError:
filter_tags.append(f'tag="{tag}"')
return filter_tags
def parse_period(period: List[str]) -> Tuple[date, date]:
"""Parsing and validating the given time period
period (List): A list with two entries containing
dates in the format yyyy/mm/dd
Returns two date-objects containing the passed dates
"""
try:
s_year, s_month, s_day = map(int, period[0].split("/"))
except ValueError as exc:
error_and_exit(
f"Start date [{period[0]}] is not a "
f"correct date format:\n{exc.args[0]}."
)
try:
e_year, e_month, e_day = map(int, period[1].split("/"))
except ValueError as exc:
error_and_exit(
f"End date [{period[1]}] is not "
f"a correct date format:\n{exc.args[0]}."
)
try:
period_start = date(s_year, s_month, s_day)
except ValueError as exc:
error_and_exit(f"Start date: {exc.args[0]}")
try:
period_end = date(e_year, e_month, e_day)
except ValueError as exc:
error_and_exit(f"End date: {exc.args[0]}")
if period_end < period_start:
error_and_exit("The start date seems to after the end date.")
return period_start, period_end
def parse_args(args: Namespace) -> Namespace: # pylint: disable=unused-argument
"""Parsing args ..."""
parser = ArgumentParser(
prefix_chars="+",
add_help=False,
formatter_class=RawTextHelpFormatter,
description=HELP_TEXT,
)
parser.add_argument(
"+h",
"++help",
action="help",
help="Show this help message and exit.",
)
parser.add_argument(
"+p",
"++period",
nargs=2,
type=str,
required=True,
dest="period",
help=(
"Choose a time period that is filtering the tasks.\n"
"Use the date format YYYY/MM/DD."
),
)
container_args = parser.add_mutually_exclusive_group()
container_args.add_argument(
"++container-id",
type=str,
dest="container_id",
help=(
"Add the consolidated report to the container task"
" with the given id instead of creating a new one."
),
)
container_args.add_argument(
"++new-container-name",
type=str,
dest="new_container_name",
help=(
"Create a new container task with the given name instead"
" of using an automatically generated name."
),
)
parser.add_argument(
"+t",
"++tags",
nargs="+",
type=str,
dest="tags",
help=(
"Filter the tasks by given tag(s).\n"
"If you pass more than on tag, they will be concatenated with "
or "\n"
"You can pass tag names, tag ids or tag name=value to this argument"
),
)
filter_args = parser.add_mutually_exclusive_group()
filter_args.add_argument(
"++filter-terms",
nargs="+",
type=str,
dest="filter_term",
help="Filter the results by given filter terms.",
)
filter_args.add_argument(
"++filter-id",
type=str,
dest="filter_id",
help="Filter the results by given filter id.",
)
script_args, _ = parser.parse_known_args()
return script_args
def generate_task_filter(
period_start: date, period_end: date, tags: List[str]
) -> str:
"""Generate the tasks filter
period_start: the start date
period_end: the end date
tags: list of tags for the filter
Returns an task filter string
"""
task_filter = "rows=-1 "
# last is for the timestamp of the last report in that task
# created is for the timestamp of when the task has been created
# Note: the "first" argument for tasks is currently not working
period_filter = (
f"last>{period_start.isoformat()} "
f"and created<{period_end.isoformat()}"
)
filter_parts = []
if tags:
for tag in tags:
filter_parts.append(f"{period_filter} and {tag}")
tags_filter = " or ".join(filter_parts)
task_filter += tags_filter
else:
task_filter += period_filter
return task_filter
def get_last_report_in_time_period(
gmp: Gmp,
task_filter: str,
period_start: date,
period_end: date,
) -> List[str]:
"""Get the last reports from the tasks in the given time period
Therefore all tasks, that match the filter within the time period
will be looked up
Afterwards the reports from that tasks will be searched for the
last report in the timeperiod by sorting them reverse by creation
date
gmp: the GMP object
task_filter: task filter string
"""
print(
f"Filtering the task with the filter term [{task_filter}]\n"
f"Looking for the last report before {period_end.isoformat()}, "
f"but after {period_start.isoformat()}."
)
tasks_xml = gmp.get_tasks(filter_string=task_filter)
reports = []
for task_id in tasks_xml.xpath("task/@id"):
# sort-reverse for getting the latest report ...
reports_xml = gmp.get_reports(
filter_string=(
f"rows=1 task_id={task_id} and "
f"created<{period_end.isoformat()} and "
f"created>{period_start.isoformat()} sort-reverse=created"
)
)
# should always be max 1 report
reports_id = reports_xml.xpath("report/@id")
if reports_id:
reports.append(reports_id[0])
else:
print(f"Failed to get report for task {task_id}", file=sys.stderr)
sys.exit(1)
return reports
def combine_reports(
gmp: Gmp, reports: List[str], filter_term: str, filter_id: str
) -> e.Element:
"""Combining the filtered ports, results and hosts of the given
report ids into one new report.
gmp: the GMP object
reports (List): List of report_ids
filter_term (str): the result filter string
"""
new_uuid = generate_uuid()
combined_report = e.Element(
"report",
{
"id": new_uuid,
"format_id": "d5da9f67-8551-4e51-807b-b6a873d70e34",
"extension": "xml",
"content_type": "text/xml",
},
)
report_elem = e.Element("report", {"id": new_uuid})
ports_elem = e.Element("ports", {"start": "1", "max": "-1"})
results_elem = e.Element("results", {"start": "1", "max": "-1"})
combined_report.append(report_elem)
report_elem.append(ports_elem)
report_elem.append(results_elem)
for report in reports:
try:
if filter_id:
current_report = gmp.get_report(
report,
filter_id=filter_id,
details=True,
ignore_pagination=True,
).find("report")
else:
current_report = gmp.get_report(
report,
filter_string=filter_term,
details=True,
ignore_pagination=True,
).find("report")
except GvmError:
print(f"Could not find the report [{report}]")
for port in current_report.xpath("report/ports/port"):
ports_elem.append(port)
for result in current_report.xpath("report/results/result"):
results_elem.append(result)
for host in current_report.xpath("report/host"):
report_elem.append(host)
return combined_report
def get_container_name(gmp: Gmp, container_id: str):
"""
Gets the name of a task by id and checks if it is a container
gmp: the GMP object
container_id: Id of the task to check
"""
res = gmp.get_task(container_id)
task_name = res.xpath("//task/name")[0].text
task_target_id = res.xpath("//task/target/@id")[0]
if task_target_id != "":
error_and_exit(f"Task [{container_id}] is not a container")
return task_name
def send_report(
gmp: Gmp,
combined_report: e.Element,
period_start: date,
period_end: date,
container_id: str,
new_container_name: str,
) -> str:
"""Creating a container task and sending the combined report to the GSM
gmp: the GMP object
combined_report: the combined report xml object
period_start: the start date
period_end: the end date
"""
task_id = None
task_name = None
if container_id:
task_name = get_container_name(gmp, container_id)
task_id = container_id
print(
"Adding consolidated report to existing container task"
f" [{task_name}] with UUID [{task_id}]"
)
else:
if new_container_name:
task_name = new_container_name
else:
task_name = f"Consolidated Report [{period_start} - {period_end}]"
res = gmp.create_container_task(
name=task_name, comment="Created with gvm-tools."
)
task_id = res.xpath("//@id")[0]
print(
"Adding consolidated report to new container task"
f" [{task_name}] with UUID [{task_id}]"
)
combined_report = e.tostring(combined_report)
res = gmp.import_report(combined_report, task_id=task_id, in_assets=True)
return res.xpath("//@id")[0]
def main(gmp: Gmp, args: Namespace) -> None:
# pylint: disable=undefined-variable
parsed_args = parse_args(args=args)
period_start, period_end = parse_period(period=parsed_args.period)
print(
"Combining last reports from tasks within the "
f"time period [{period_start}, {period_end}]"
)
# Generate Task Filter
filter_tags = None
if parsed_args.tags:
filter_tags = parse_tags(tags=parsed_args.tags)
task_filter = generate_task_filter(
period_start=period_start,
period_end=period_end,
tags=filter_tags,
)
# Find reports
reports = get_last_report_in_time_period(
gmp=gmp,
task_filter=task_filter,
period_start=period_start,
period_end=period_end,
)
print(f"Combining {len(reports)} found reports.")
filter_term = ""
if parsed_args.filter_term:
filter_term = " ".join(parsed_args.filter_term)
print(
"Filtering the results by the "
f"following filter term [{filter_term}]"
)
elif parsed_args.filter_id:
try:
filter_xml = gmp.get_filter(filter_id=parsed_args.filter_id).find(
"filter"
)
filter_term = filter_xml.find("term").text
print(
"Filtering the results by the following filter term "
f"[{filter_term}]"
)
except GvmError:
print(
"Filter with the ID [{parsed_args.filter_id}] is not existing."
)
else:
print("No results filter given.")
# Combine the reports
combined_report = combine_reports(
gmp=gmp,
reports=reports,
filter_term=filter_term,
filter_id=parsed_args.filter_id,
)
# Import the generated report to GSM
report = send_report(
gmp=gmp,
combined_report=combined_report,
period_start=period_start,
period_end=period_end,
container_id=parsed_args.container_id,
new_container_name=parsed_args.new_container_name,
)
print(f"Successfully imported new consolidated report [{report}]")
if __name__ == "__gmp__":
main(gmp, args)