From 7c2a086a5dbd2a4f735d0e7a3af90b035e9020de Mon Sep 17 00:00:00 2001 From: "Jonathan G. Underwood" Date: Mon, 27 Jul 2020 21:39:41 +0100 Subject: [PATCH] WIP: build out firewall parser and tests --- .../backends/openwrt/converters/firewall.py | 25 ++- tests/openwrt/test_default.py | 62 +++++--- tests/openwrt/test_firewall.py | 143 ++++++++++++++++++ 3 files changed, 199 insertions(+), 31 deletions(-) create mode 100644 tests/openwrt/test_firewall.py diff --git a/netjsonconfig/backends/openwrt/converters/firewall.py b/netjsonconfig/backends/openwrt/converters/firewall.py index 3d461a57b..ed36e11f4 100644 --- a/netjsonconfig/backends/openwrt/converters/firewall.py +++ b/netjsonconfig/backends/openwrt/converters/firewall.py @@ -84,12 +84,21 @@ def __get_auto_name_rule(self, rule): return "rule_{0}".format(self._get_uci_name(rule["name"])) def to_netjson_loop(self, block, result, index): - result["firewall"] = self.__netjson_firewall(block) - return result + result.setdefault("firewall", {}) + result["firewall"].setdefault("rules", []) + + # _name = block.pop(".name") + _type = block.pop(".type") + + if _type == "rule": + rule = self.__netjson_rule(block) + # result["firewall"].setdefault("rules", []) + result["firewall"]["rules"].append(rule) + + return self.type_cast(result) + + def __netjson_rule(self, rule): + if "enabled" in rule: + rule["enabled"] = rule.pop("enabled") == "1" - def __netjson_firewall(self, firewall): - del firewall[".type"] - _name = firewall.pop(".name") - if _name != "firewall": - firewall["id"] = _name - return self.type_cast(firewall) + return self.type_cast(rule) diff --git a/tests/openwrt/test_default.py b/tests/openwrt/test_default.py index d5491bb9d..28e8be761 100644 --- a/tests/openwrt/test_default.py +++ b/tests/openwrt/test_default.py @@ -24,7 +24,6 @@ def test_render_default(self): "firewall": { "rules": [ { - "config_name": "rule", "name": "Allow-MLD", "src": "wan", "proto": "icmp", @@ -34,7 +33,6 @@ def test_render_default(self): "icmp_type": ["130/0", "131/0", "132/0", "143/0"], }, { - "config_name": "rule", "name": "Rule2", "src": "wan", "proto": "icmp", @@ -138,45 +136,63 @@ def test_parse_default(self): ) o = OpenWrt(native=native) expected = { - "luci": [ + "led": [ { - "config_name": "core", - "config_value": "main", - "lang": "auto", - "resourcebase": "/luci-static/resources", - "mediaurlbase": "/luci-static/bootstrap", - "number": "4", - "boolean": "1", + "dev": "1-1.1", + "interval": 50, + "name": "USB1", + "sysfs": "tp-link:green:usb1", + "trigger": "usbdev" + } + ], + "interfaces": [ + { + "name": "eth0", + "type": "ethernet" } ], "firewall": { "rules": [ { - "config_name": "rule", + "family": "ipv6", + "icmp_type": [ + "130/0", + "131/0", + "132/0", + "143/0" + ], "name": "Allow-MLD", - "src": "wan", "proto": "icmp", + "src": "wan", "src_ip": "fe80::/10", - "family": "ipv6", "target": "ACCEPT", - "icmp_type": ["130/0", "131/0", "132/0", "143/0"], } ] }, - "led": [ + "luci": [ { - "name": "USB1", - "sysfs": "tp-link:green:usb1", - "trigger": "usbdev", - "dev": "1-1.1", - "interval": 50, + "boolean": "1", + "lang": "auto", + "mediaurlbase": "/luci-static/bootstrap", + "number": "4", + "resourcebase": "/luci-static/resources", + "config_value": "main", + "config_name": "core" } ], - "interfaces": [{"name": "eth0", "type": "ethernet"}], "system": [ - {"test": "1", "config_name": "custom", "config_value": "custom"} - ], + { + "test": "1", + "config_value": "custom", + "config_name": "custom" + } + ] } + + print("*" * 80) + import json + print(json.dumps(o.config, indent=4)) + print("*" * 80) self.assertDictEqual(o.config, expected) def test_skip(self): diff --git a/tests/openwrt/test_firewall.py b/tests/openwrt/test_firewall.py new file mode 100644 index 000000000..44a12bb20 --- /dev/null +++ b/tests/openwrt/test_firewall.py @@ -0,0 +1,143 @@ +import textwrap +import unittest + +from netjsonconfig import OpenWrt +# from netjsonconfig.exceptions import ValidationError +from netjsonconfig.utils import _TabsMixin + + +class TestFirewall(unittest.TestCase, _TabsMixin): + maxDiff = None + + _rule_1_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-MLD", + "src": "wan", + "src_ip": "fe80::/10", + "proto": "icmp", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + "target": "ACCEPT", + "family": "ipv6", + } + ] + } + } + + _rule_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'rule_Allow_MLD' + option name 'Allow-MLD' + option src 'wan' + option src_ip 'fe80::/10' + option proto 'icmp' + list icmp_type '130/0' + list icmp_type '131/0' + list icmp_type '132/0' + list icmp_type '143/0' + option target 'ACCEPT' + option family 'ipv6' + """ + ) + + def test_render_rule_1(self): + o = OpenWrt(self._rule_1_netjson) + expected = self._tabs(self._rule_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_1(self): + o = OpenWrt(native=self._rule_1_uci) + self.assertEqual(o.config, self._rule_1_netjson) + + _rule_2_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-DHCPv6", + "src": "wan", + "src_ip": "fc00::/6", + "dest_ip": "fc00::/6", + "dest_port": "546", + "proto": "udp", + "target": "ACCEPT", + "family": "ipv6", + } + ] + } + } + + _rule_2_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'rule_Allow_DHCPv6' + option name 'Allow-DHCPv6' + option src 'wan' + option src_ip 'fc00::/6' + option dest_ip 'fc00::/6' + option dest_port '546' + option proto 'udp' + option target 'ACCEPT' + option family 'ipv6' + """ + ) + + def test_render_rule_2(self): + o = OpenWrt(self._rule_2_netjson) + expected = self._tabs(self._rule_2_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_2(self): + o = OpenWrt(native=self._rule_2_uci) + self.assertEqual(o.config, self._rule_2_netjson) + + _rule_3_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-Ping", + "src": "wan", + "proto": "icmp", + "family": "ipv4", + "icmp_type": [ + "echo-request", + ], + "target": "ACCEPT", + "enabled": False, + } + ] + } + } + + _rule_3_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'rule_Allow_Ping' + option name 'Allow-Ping' + option src 'wan' + option proto 'icmp' + option family 'ipv4' + list icmp_type 'echo-request' + option target 'ACCEPT' + option enabled '0' + """ + ) + + def test_render_rule_3(self): + o = OpenWrt(self._rule_3_netjson) + expected = self._tabs(self._rule_3_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_3(self): + o = OpenWrt(native=self._rule_3_uci) + self.assertEqual(o.config, self._rule_3_netjson)