Skip to content
This repository has been archived by the owner on Nov 29, 2021. It is now read-only.

Extend get_vts with attribute version_only and return the version #291

Merged
merged 1 commit into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Add new scan status INTERRUPTED.
[#288](https://github.com/greenbone/ospd/pull/288)
[#289](https://github.com/greenbone/ospd/pull/289)
- Extend get_vts with attribute version_only and return the version [#291](https://github.com/greenbone/ospd/pull/291)

### Changes
- Modify __init__() method and use new syntax for super(). [#186](https://github.com/greenbone/ospd/pull/186)
Expand Down
19 changes: 13 additions & 6 deletions ospd/command/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
vt_id = xml.get('vt_id')
vt_filter = xml.get('filter')
_details = xml.get('details')
version_only = xml.get('version_only')

vt_details = False if _details == '0' else True

Expand All @@ -328,7 +329,7 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
raise OspdCommandError(text, 'get_vts', 404)

filtered_vts = None
if vt_filter:
if vt_filter and not version_only:
try:
filtered_vts = self._daemon.vts_filter.get_filtered_vts_list(
self._daemon.vts, vt_filter
Expand All @@ -337,14 +338,20 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
self._daemon.vts.is_cache_available = True
raise OspdCommandError(filter_error)

vts_selection = self._daemon.get_vts_selection_list(vt_id, filtered_vts)
if not version_only:
vts_selection = self._daemon.get_vts_selection_list(
vt_id, filtered_vts
)
# List of xml pieces with the generator to be iterated
yield xml_helper.create_response('get_vts')

begin_vts_tag = xml_helper.create_element('vts')
begin_vts_tag = xml_helper.add_attr(
begin_vts_tag, "vts_version", self._daemon.get_vts_version()
)
val = len(self._daemon.vts)
begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "total", val)
if filtered_vts:
if filtered_vts and not version_only:
val = len(filtered_vts)
begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "sent", val)

Expand All @@ -354,9 +361,9 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
)

yield begin_vts_tag

for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
if not version_only:
for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
yield xml_helper.add_element(self._daemon.get_vt_xml(vt))

yield xml_helper.create_element('vts', end=True)
yield xml_helper.create_response('get_vts', end=True)
Expand Down
33 changes: 33 additions & 0 deletions tests/test_scan_and_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,39 @@ def test_get_vts_single_vt(self):
vt = vts.find('vt')
self.assertEqual(vt.get('id'), '1.2.3.4')

def test_get_vts_version(self):
fs = FakeStream()
self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
self.daemon.set_vts_version('today')
self.daemon.handle_command('<get_vts />', fs)
response = fs.get_response()

self.assertEqual(response.get('status'), '200')

vts_version = response.find('vts').attrib['vts_version']
self.assertEqual(vts_version, self.daemon.get_vts_version())

vts = response.find('vts')
self.assertIsNotNone(vts.find('vt'))

vt = vts.find('vt')
self.assertEqual(vt.get('id'), '1.2.3.4')

def test_get_vts_version_only(self):
fs = FakeStream()
self.daemon.add_vt('1.2.3.4', 'A vulnerability test')
self.daemon.set_vts_version('today')
self.daemon.handle_command('<get_vts version_only="1"/>', fs)
response = fs.get_response()

self.assertEqual(response.get('status'), '200')

vts_version = response.find('vts').attrib['vts_version']
self.assertEqual(vts_version, self.daemon.get_vts_version())

vts = response.find('vts')
self.assertIsNone(vts.find('vt'))

def test_get_vts_still_not_init(self):
fs = FakeStream()
self.daemon.initialized = False
Expand Down