From 6a41a4da43215f039b3ab2716e8b49be29f0ddfd Mon Sep 17 00:00:00 2001 From: ReimarBauer Date: Fri, 16 Aug 2024 20:57:48 +0200 Subject: [PATCH] XML validation --- mslib/mscolab/file_manager.py | 29 +++++----- mslib/msui/flighttrack.py | 15 +++-- mslib/msui/mscolab.py | 4 ++ mslib/utils/verify_waypoint_data.py | 53 +++++++++++++++++ tests/_test_mscolab/test_files.py | 38 +++++++++--- tests/_test_mscolab/test_files_api.py | 17 +++--- tests/_test_mscolab/test_server.py | 11 ++-- .../_test_utils/test_verify_waypoint_data.py | 58 +++++++++++++++++++ tests/utils.py | 51 ++++++++++++++++ 9 files changed, 239 insertions(+), 37 deletions(-) create mode 100644 mslib/utils/verify_waypoint_data.py create mode 100644 tests/_test_utils/test_verify_waypoint_data.py diff --git a/mslib/mscolab/file_manager.py b/mslib/mscolab/file_manager.py index 3be1a0179..32e7b51fd 100644 --- a/mslib/mscolab/file_manager.py +++ b/mslib/mscolab/file_manager.py @@ -31,6 +31,7 @@ import git import threading from sqlalchemy.exc import IntegrityError +from mslib.utils.verify_waypoint_data import verify_waypoint_data from mslib.mscolab.models import db, Operation, Permission, User, Change, Message from mslib.mscolab.conf import mscolab_settings @@ -83,11 +84,11 @@ def create_operation(self, path, description, user, last_used=None, content=None self.import_permissions(import_op.id, operation_id, user.id) data = fs.open_fs(self.data_dir) data.makedir(operation.path) - operation_file = data.open(fs.path.combine(operation.path, 'main.ftml'), 'w') - if content is not None: - operation_file.write(content) - else: - operation_file.write(mscolab_settings.STUB_CODE) + with data.open(fs.path.combine(operation.path, 'main.ftml'), 'w') as operation_file: + if content is not None and verify_waypoint_data(content): + operation_file.write(content) + else: + operation_file.write(mscolab_settings.STUB_CODE) operation_path = fs.path.combine(self.data_dir, operation.path) r = git.Repo.init(operation_path) r.git.clear_cache() @@ -263,14 +264,14 @@ def update_operation(self, op_id, attribute, value, user): if value.find("/") != -1 or value.find("\\") != -1 or (" " in value): logging.debug("malicious request: %s", user) return False - data = fs.open_fs(self.data_dir) - if data.exists(value): - return False - # will be move when operations are introduced - # make a directory, else movedir - data.makedir(value) - data.movedir(operation.path, value) - # when renamed to a Group operation + with fs.open_fs(self.data_dir) as data: + if data.exists(value): + return False + # will be move when operations are introduced + # make a directory, else movedir + data.makedir(value) + data.movedir(operation.path, value) + # when renamed to a Group operation if value.endswith(mscolab_settings.GROUP_POSTFIX): # getting the category category = value.split(mscolab_settings.GROUP_POSTFIX)[0] @@ -318,6 +319,8 @@ def save_file(self, op_id, content, user, comment=""): content: content of the file to be saved # ToDo save change in schema """ + if not verify_waypoint_data(content): + return False # ToDo use comment operation = Operation.query.filter_by(id=op_id).first() if not operation: diff --git a/mslib/msui/flighttrack.py b/mslib/msui/flighttrack.py index 1d3ad2b15..dba4bff64 100644 --- a/mslib/msui/flighttrack.py +++ b/mslib/msui/flighttrack.py @@ -48,6 +48,7 @@ from mslib.utils.units import units from mslib.utils.coordinate import find_location, path_points, get_distance from mslib.utils import thermolib +from mslib.utils.verify_waypoint_data import verify_waypoint_data from mslib.utils.config import config_loader, save_settings_qsettings, load_settings_qsettings from mslib.utils.config import MSUIDefaultConfig as mss_default from mslib.utils.qt import variant_to_string, variant_to_float @@ -644,13 +645,19 @@ def load_from_ftml(self, filename): _dirname, _name = os.path.split(filename) _fs = fs.open_fs(_dirname) xml_content = _fs.readtext(_name) - name = os.path.basename(filename.replace(".ftml", "").strip()) - self.load_from_xml_data(xml_content, name) + if verify_waypoint_data(xml_content): + name = os.path.basename(filename.replace(".ftml", "").strip()) + self.load_from_xml_data(xml_content, name) + else: + raise SyntaxError(f"Invalid flight track filename: {filename}") def load_from_xml_data(self, xml_content, name="Flight track"): self.name = name - _waypoints_list = load_from_xml_data(xml_content, name) - self.replace_waypoints(_waypoints_list) + if verify_waypoint_data(xml_content): + _waypoints_list = load_from_xml_data(xml_content, name) + self.replace_waypoints(_waypoints_list) + else: + raise Exception(f"Invalid flight track filename: {name}") def get_filename(self): return self.filename diff --git a/mslib/msui/mscolab.py b/mslib/msui/mscolab.py index 42c086035..254aacb25 100644 --- a/mslib/msui/mscolab.py +++ b/mslib/msui/mscolab.py @@ -57,6 +57,7 @@ from mslib.utils.auth import get_password_from_keyring, save_password_to_keyring from mslib.utils.verify_user_token import verify_user_token +from mslib.utils.verify_waypoint_data import verify_waypoint_data from mslib.utils.qt import get_open_filename, get_save_filename, dropEvent, dragEnterEvent, show_popup from mslib.msui.qt5 import ui_mscolab_help_dialog as msc_help_dialog from mslib.msui.qt5 import ui_add_operation_dialog as add_operation_ui @@ -1977,6 +1978,9 @@ def handle_import_msc(self, file_path, extension, function, pickertype): model = ft.WaypointsTableModel(waypoints=new_waypoints) xml_doc = self.waypoints_model.get_xml_doc() xml_content = xml_doc.toprettyxml(indent=" ", newl="\n") + if not verify_waypoint_data(xml_content): + show_popup(self.ui, "Import Success", f"The file - {file_name}, was not imported!", 0) + return self.waypoints_model.dataChanged.disconnect(self.handle_waypoints_changed) self.waypoints_model = model self.handle_waypoints_changed() diff --git a/mslib/utils/verify_waypoint_data.py b/mslib/utils/verify_waypoint_data.py new file mode 100644 index 000000000..42b248f2b --- /dev/null +++ b/mslib/utils/verify_waypoint_data.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +""" + + mslib.utils.verify_waypoint_data + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + basic checks for xml waypoint data. + + This file is part of MSS. + + :copyright: Copyright 2024 Reimar Bauer + :license: APACHE-2.0, see LICENSE for details. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + + +import xml.dom.minidom +import xml.parsers.expat + + +def verify_waypoint_data(xml_content): + try: + doc = xml.dom.minidom.parseString(xml_content) + except xml.parsers.expat.ExpatError: + return False + + ft_el = doc.getElementsByTagName("FlightTrack")[0] + waypoints = ft_el.getElementsByTagName("Waypoint") + if (len(waypoints)) < 2: + return False + + for wp_el in ft_el.getElementsByTagName("Waypoint"): + try: + wp_el.getAttribute("location") + float(wp_el.getAttribute("lat")) + float(wp_el.getAttribute("lon")) + float(wp_el.getAttribute("flightlevel")) + wp_el.getElementsByTagName("Comments")[0] + except ValueError: + return False + + return True diff --git a/tests/_test_mscolab/test_files.py b/tests/_test_mscolab/test_files.py index 1710ec9d5..c41c81328 100644 --- a/tests/_test_mscolab/test_files.py +++ b/tests/_test_mscolab/test_files.py @@ -32,6 +32,7 @@ from mslib.mscolab.models import User, Operation, Permission, Change, Message from mslib.mscolab.seed import add_user, get_user from mslib.mscolab.utils import get_recent_op_id +from tests.utils import XML_CONTENT1, XML_CONTENT2 class Test_Files: @@ -94,25 +95,48 @@ def test_is_creator(self): def test_file_save(self): with self.app.test_client(): flight_path, operation = self._create_operation(flight_path="operation77") - assert self.fm.save_file(operation.id, "beta", self.user) - assert self.fm.get_file(operation.id, self.user) == "beta" - assert self.fm.save_file(operation.id, "gamma", self.user) - assert self.fm.get_file(operation.id, self.user) == "gamma" + assert self.fm.save_file(operation.id, XML_CONTENT1, self.user) + assert self.fm.get_file(operation.id, self.user) == XML_CONTENT1 + assert self.fm.save_file(operation.id, XML_CONTENT2, self.user) + assert self.fm.get_file(operation.id, self.user) == XML_CONTENT2 # check if change is saved properly changes = self.fm.get_all_changes(operation.id, self.user) assert len(changes) == 2 + def test_cant_save(self): + with self.app.test_client(): + flight_path, operation = self._create_operation(flight_path="operation911") + assert self.fm.save_file(operation.id, "text", self.user) is False + incomplete = """ + + + """ + assert self.fm.save_file(operation.id, incomplete, self.user) is False + incomplete = """ + + + + + + + + + + """ + assert self.fm.save_file(operation.id, incomplete, self.user) is False + + def test_undo(self): with self.app.test_client(): flight_path, operation = self._create_operation(flight_path="operation7", content="alpha") - assert self.fm.save_file(operation.id, "beta", self.user) - assert self.fm.save_file(operation.id, "gamma", self.user) + assert self.fm.save_file(operation.id, XML_CONTENT1, self.user) + assert self.fm.save_file(operation.id, XML_CONTENT2, self.user) changes = Change.query.filter_by(op_id=operation.id).all() assert changes is not None assert changes[0].id == 1 assert self.fm.undo_changes(changes[0].id, self.user) is True assert len(self.fm.get_all_changes(operation.id, self.user)) == 3 - assert "beta" in self.fm.get_file(operation.id, self.user) + assert XML_CONTENT1 == self.fm.get_file(operation.id, self.user) def test_get_operation(self): with self.app.test_client(): diff --git a/tests/_test_mscolab/test_files_api.py b/tests/_test_mscolab/test_files_api.py index 43dcb4864..7ee46f271 100644 --- a/tests/_test_mscolab/test_files_api.py +++ b/tests/_test_mscolab/test_files_api.py @@ -29,6 +29,7 @@ from mslib.mscolab.models import Operation from mslib.mscolab.seed import add_user, get_user +from tests.utils import XML_CONTENT1, XML_CONTENT2, XML_CONTENT3 class Test_Files: @@ -170,8 +171,8 @@ def test_delete_operation(self): def test_get_all_changes(self): with self.app.test_client(): flight_path, operation = self._create_operation(flight_path="V11") - assert self.fm.save_file(operation.id, "content1", self.user) - assert self.fm.save_file(operation.id, "content2", self.user) + assert self.fm.save_file(operation.id, XML_CONTENT1, self.user) + assert self.fm.save_file(operation.id, XML_CONTENT2, self.user) all_changes = self.fm.get_all_changes(operation.id, self.user) # the newest change is on index 0, because it has a recent created_at time assert len(all_changes) == 2 @@ -182,19 +183,19 @@ def test_get_all_changes(self): def test_get_change_content(self): with self.app.test_client(): flight_path, operation = self._create_operation(flight_path="V12", content='initial') - assert self.fm.save_file(operation.id, "content1", self.user) - assert self.fm.save_file(operation.id, "content2", self.user) - assert self.fm.save_file(operation.id, "content3", self.user) + assert self.fm.save_file(operation.id, XML_CONTENT1, self.user) + assert self.fm.save_file(operation.id, XML_CONTENT2, self.user) + assert self.fm.save_file(operation.id, XML_CONTENT3, self.user) all_changes = self.fm.get_all_changes(operation.id, self.user) previous_change = self.fm.get_change_content(all_changes[2]["id"], self.user) - assert previous_change == "content1" + assert previous_change == XML_CONTENT1 previous_change = self.fm.get_change_content(all_changes[1]["id"], self.user) - assert previous_change == "content2" + assert previous_change == XML_CONTENT2 def test_set_version_name(self): with self.app.test_client(): flight_path, operation = self._create_operation(flight_path="V13", content='initial') - assert self.fm.save_file(operation.id, "content1", self.user) + assert self.fm.save_file(operation.id, XML_CONTENT1, self.user) all_changes = self.fm.get_all_changes(operation.id, self.user) ch_id = all_changes[-1]["id"] self.fm.set_version_name(ch_id, operation.id, self.user.id, "berlin") diff --git a/tests/_test_mscolab/test_server.py b/tests/_test_mscolab/test_server.py index 9ffb47a0c..10e20f581 100644 --- a/tests/_test_mscolab/test_server.py +++ b/tests/_test_mscolab/test_server.py @@ -33,6 +33,7 @@ from mslib.mscolab.server import check_login, register_user from mslib.mscolab.file_manager import FileManager from mslib.mscolab.seed import add_user, get_user +from tests.utils import XML_CONTENT1, XML_CONTENT2 class Test_Server: @@ -233,7 +234,7 @@ def test_get_all_changes(self): with self.app.test_client() as test_client: operation, token = self._create_operation(test_client, self.userdata) fm, user = self._save_content(operation, self.userdata) - fm.save_file(operation.id, "content2", user) + fm.save_file(operation.id, XML_CONTENT2, user) # the newest change is on index 0, because it has a recent created_at time response = test_client.get('/get_all_changes', data={"token": token, "op_id": operation.id}) @@ -250,20 +251,20 @@ def test_get_change_content(self): with self.app.test_client() as test_client: operation, token = self._create_operation(test_client, self.userdata) fm, user = self._save_content(operation, self.userdata) - fm.save_file(operation.id, "content2", user) + fm.save_file(operation.id, XML_CONTENT2, user) all_changes = fm.get_all_changes(operation.id, user) response = test_client.get('/get_change_content', data={"token": token, "ch_id": all_changes[1]["id"]}) assert response.status_code == 200 data = json.loads(response.data.decode('utf-8')) - assert data == {'content': 'content1'} + assert data == {'content': XML_CONTENT1} def test_set_version_name(self): assert add_user(self.userdata[0], self.userdata[1], self.userdata[2]) with self.app.test_client() as test_client: operation, token = self._create_operation(test_client, self.userdata) fm, user = self._save_content(operation, self.userdata) - fm.save_file(operation.id, "content2", user) + fm.save_file(operation.id, XML_CONTENT2, user) all_changes = fm.get_all_changes(operation.id, user) ch_id = all_changes[1]["id"] version_name = "THIS" @@ -410,5 +411,5 @@ def _save_content(self, operation, userdata=None): userdata = self.userdata user = get_user(userdata[0]) fm = FileManager(self.app.config["MSCOLAB_DATA_DIR"]) - fm.save_file(operation.id, "content1", user) + fm.save_file(operation.id, XML_CONTENT1, user) return fm, user diff --git a/tests/_test_utils/test_verify_waypoint_data.py b/tests/_test_utils/test_verify_waypoint_data.py new file mode 100644 index 000000000..a1065a586 --- /dev/null +++ b/tests/_test_utils/test_verify_waypoint_data.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +""" + + tests._test_utils.test_verify_xml_waypoint + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + This tests for valid xml data of waypoint data. + + This file is part of MSS. + + :copyright: Copyright 2024 Reimar Bauer + :license: APACHE-2.0, see LICENSE for details. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +from mslib.utils.verify_waypoint_data import verify_waypoint_data + + +def test_verify_xml_waypoint(): + xml_content = ( + (""" + + + + + + + + + + + + + + + + """, True), + (""" + + + + """, False), + ) + + + for xml, check in xml_content: + assert verify_waypoint_data(xml) is check diff --git a/tests/utils.py b/tests/utils.py index 7e0083f5d..b7a47b7e9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -34,6 +34,57 @@ from tests.constants import MSUI_CONFIG_PATH +XML_CONTENT1 = """ + + + + + + + + + + + + + + + + """ + + +XML_CONTENT2 = """ + + + + + + + + + + """ + + +XML_CONTENT3 = """ + + + + + + + + + + + + + + + + """ + + def callback_ok_image(status, response_headers): assert status == "200 OK" assert response_headers[0] == ('Content-type', 'image/png')