-
Notifications
You must be signed in to change notification settings - Fork 746
/
Copy pathtest_cacl_application.py
1281 lines (1035 loc) · 57.5 KB
/
test_cacl_application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import ipaddress
import json
import logging
import pytest
from tests.common.config_reload import config_reload
from tests.common.utilities import wait_until
from tests.common.dualtor.mux_simulator_control import toggle_all_simulator_ports_to_upper_tor # noqa F401
from tests.common.dualtor.dual_tor_utils import upper_tor_host, lower_tor_host # noqa F401
from tests.common.helpers.assertions import pytest_assert
logger = logging.getLogger(__name__)
pytestmark = [
pytest.mark.disable_loganalyzer, # disable automatic loganalyzer globally
pytest.mark.topology('any')
]
@pytest.fixture(scope="module", autouse=True)
def disable_port_toggle(duthosts, tbinfo):
# set mux mode to manual on both TORs to avoid port state change during test
if "dualtor" in tbinfo['topo']['name']:
for dut in duthosts:
dut.shell("sudo config mux mode manual all")
yield
if "dualtor" in tbinfo['topo']['name']:
for dut in duthosts:
dut.shell("sudo config mux mode auto all")
@pytest.fixture(scope="function", params=["active_tor", "standby_tor"])
def duthost_dualtor(request, upper_tor_host, lower_tor_host): # noqa F811
which_tor = request.param
# Add expected DHCP mark iptable rules for standby tor, not for active tor.
if which_tor == 'standby_tor':
dut = lower_tor_host
logger.info("Select lower tor...")
else:
logger.info("Select upper tor...")
dut = upper_tor_host
return dut
@pytest.fixture
def expected_dhcp_rules_for_standby(duthost_dualtor):
expected_dhcp_rules = []
mux_cable_int_keys = duthost_dualtor.shell('/usr/bin/redis-cli -n 6 --raw keys "MUX_CABLE_TABLE*"',
module_ignore_errors=True)['stdout']
mux_cable_int_keys = mux_cable_int_keys.split("\n")
for mux_cable_int in mux_cable_int_keys:
interface_name = mux_cable_int.split("|")[1]
mux_status = duthost_dualtor.shell('/usr/bin/redis-cli -n 6 --raw hget "{}" "state"'
.format(mux_cable_int), module_ignore_errors=False)['stdout']
if not mux_status:
continue
if mux_status == 'standby':
mark = duthost_dualtor.shell('/usr/bin/redis-cli -n 6 --raw hget "DHCP_PACKET_MARK|{}" "mark"'
.format(interface_name), module_ignore_errors=False)['stdout']
rule = "-A DHCP -m mark --mark {} -j DROP".format(mark)
expected_dhcp_rules.append(rule)
logger.info("Generated expected dhcp rules for standby interfaces: {}".format(expected_dhcp_rules))
return expected_dhcp_rules
@pytest.fixture(scope="module")
def docker_network(duthosts, enum_rand_one_per_hwsku_hostname, enum_frontend_asic_index):
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
output = duthost.command("docker inspect bridge")
docker_containers_info = json.loads(output['stdout'])[0]['Containers']
ipam_info = json.loads(output['stdout'])[0]['IPAM']
docker_network = {}
"""
FIXME: Work around dockerd issue. The Gateway entry might be missing. In that case, use 'Subnet' instead.
Sample output when docker hit the issue (Note that the IPv6 gateway is missing):
"Config": [
{
"Subnet": "240.127.1.0/24",
"Gateway": "240.127.1.1"
},
{
"Subnet": "fd00::/80"
}
]
When Gateway IP is missing, form the g/w IP as subnet + '1'.
IPv4 Gateway would be '240.127.1' + '1'
IPv6 Gateway would be 'fd00::' + '1'
"""
docker_network['bridge'] = {'IPv4Address': ipam_info['Config'][0].get('Gateway',
ipam_info['Config'][0].get('Subnet')
.split('/')[0][:-1] + '1'),
'IPv6Address': ipam_info['Config'][1].get('Gateway',
ipam_info['Config'][1].get('Subnet')
.split('/')[0] + '1')}
docker_network['container'] = {}
for k, v in list(docker_containers_info.items()):
docker_network['container'][v['Name']] = {'IPv4Address': v['IPv4Address'].split('/')[0],
'IPv6Address': v['IPv6Address'].split('/')[0]}
return docker_network
@pytest.fixture(scope="function")
def collect_ignored_rules(duthosts, enum_rand_one_per_hwsku_hostname):
"""
Collect existing iptables rules before test, set them as ignored as they are not related to CACL test cases.
Args:
duthosts: All DUTs belong to the testbed.
enum_rand_one_per_hwsku_hostname: hostname of a random chosen dut to run test.
Returns:
None
"""
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
ignored_rules_v4 = duthost.command("iptables -S")["stdout_lines"]
ignored_rules_v6 = duthost.command("ip6tables -S")["stdout_lines"]
ignored_rules = {}
ignored_rules["v4"] = ignored_rules_v4
ignored_rules["v6"] = ignored_rules_v6
return ignored_rules
@pytest.fixture(scope="function")
def clean_scale_rules(duthosts, enum_rand_one_per_hwsku_hostname, collect_ignored_rules):
"""
Clear other control ACL rules before test to avoid miscalucation,
delete ACL template json file and clean ACL rules, recover configuration after test.
Args:
duthosts: All DUTs belong to the testbed.
enum_rand_one_per_hwsku_hostname: hostname of a random chosen dut to run test.
collect_ignored_rules: ignored iptable/ip6table rules.
Returns:
None
"""
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
yield
logger.info("delete tmp file and recover ACL configuration")
# delete the tmp file
duthost.file(path=SCALE_ACL_FILE, state='absent')
logger.info("Reload config to recover configuration.")
config_reload(duthost, safe_reload=True, check_intf_up_ports=True)
@pytest.fixture(scope="function")
def dummy_acl_rules(duthosts, enum_rand_one_per_hwsku_hostname):
"""
Generate dummy acl rules for SNMP-ACL, ssh-only, NTP-ACL tables with template json file,
which contains both DROP and ACCEPT rules, and both IPv4 and IPv6 addresses.
Returns:
file_path: path to the generated ACL JSON file on the DUT
"""
duthost = duthosts[enum_rand_one_per_hwsku_hostname]
file_path = "/tmp/generated_acl.json"
rules_data = {}
snmp_acl_entry = {}
ssh_acl_entry = {}
ntp_acl_entry = {}
for index in range(1, 22):
acl_entry = {}
# Alternate between IPv4 and IPv6 addresses (should be 2 IPv4 for every IPv6)
src_ip = f"20.0.0.{1 + index}/32" if index % 3 else f"2001::{1 + index}/128"
# Alternate between accept and drop for each rule
action = "ACCEPT" if index % 2 else "DROP"
acl_entry[index] = {
"actions": {
"config": {
"forwarding-action": action
}
},
"config": {
"sequence-id": index
},
"ip": {
"config": {
"source-ip-address": src_ip
}
}
}
snmp_acl_entry.update(acl_entry)
ssh_acl_entry.update(acl_entry)
ntp_acl_entry.update(acl_entry)
rules_data['acl'] = {
"acl-sets": {
"acl-set": {
"SNMP-ACL": {
"acl-entries": {
"acl-entry": snmp_acl_entry
},
"config": {
"name": "SNMP-ACL"
}
},
"ssh-only": {
"acl-entries": {
"acl-entry": ssh_acl_entry
},
"config": {
"name": "ssh-only"
}
},
"ntp-acl": {
"acl-entries": {
"acl-entry": ntp_acl_entry
},
"config": {
"name": "ntp-acl"
}
}
}
}
}
duthost.copy(content=json.dumps(rules_data, indent=4), dest=file_path)
cmds = 'acl-loader update full {}'.format(file_path)
duthost.command(cmds)
logger.info('Waiting for all rules to be applied...')
# "acl-loader update full **.json" command will refresh iptables, we have to
# add the ACCEPT SSH iptables rule after acl-loader command. But on multi-asic
# testbed, it always costs minutes to sync iptables rules after updating cacl
# rules, if sleep for more than 3 mins with time.sleep, then, the process tries to
# add the ACCEPT SSH rule, at this point, SSH connection is disconnected now
# because the default SSH timeout is 30s, next duthost.command will try to reconnect
# to DUT, it will trigger ssh login, it will be rejected by default CACL DENY rule,
# test will fail. We have wait_until to solve this problem, here add wail_until
# to call check_iptable_rules every 10s to keep ssh session alive, it just calls
# duthost.command to active ssh connection.
# In this way, we can active ssh connection and wait as long as we want.
# It has to wait cacl rules to be effective.
wait_until(200, 10, 2, check_iptable_rules, duthost)
# add ACCEPT rule for SSH to make sure testbed access
duthost.command("iptables -I INPUT 3 -p tcp -m tcp --dport 22 -j ACCEPT")
yield file_path
logger.info(f"Deleting {file_path}...")
duthost.file(path=file_path, state='absent')
logger.info("Reloading config to recover original ACL configuration...")
config_reload(duthost, safe_reload=True, check_intf_up_ports=True)
def is_acl_rule_empty(duthost):
"""
Check the output of "show acl rule", return True if rules are cleaned.
Args:
duthosts: All DUTs belong to the testbed.
Returns:
boolean: True of False
"""
stdout_lines = duthost.command("show acl rule")["stdout_lines"]
stdout_lines = stdout_lines[2:]
if len(stdout_lines) != 0:
return False
return True
def check_iptable_rules(duthost):
"""
It just calls duthost.commmand to show iptables.
The function is used to keep ssh session not timeout, otherwise reconnection
will be failed due to default CACL DENY rule.
Args:
duthosts: All DUTs belong to the testbed.
Returns:
boolean: False
"""
duthost.command("iptables -S")
duthost.command("ip6tables -S")
return False
# To specify a port range instead of a single port, use iptables format:
# separate start and end ports with a colon, e.g., "1000:2000"
ACL_SERVICES = {
"NTP": {
"ip_protocols": ["udp"],
"dst_ports": ["123"],
"multi_asic_ns_to_host_fwd": False
},
"SNMP": {
"ip_protocols": ["tcp", "udp"],
"dst_ports": ["161"],
"multi_asic_ns_to_host_fwd": True
},
"SSH": {
"ip_protocols": ["tcp"],
"dst_ports": ["22"],
"multi_asic_ns_to_host_fwd": True
}
}
# Template json file used to test scale rules
SCALE_ACL_FILE = "/tmp/scale_cacl.json"
def parse_int_to_tcp_flags(hex_value):
tcp_flags_str = ""
if hex_value & 0x01:
tcp_flags_str += "FIN,"
if hex_value & 0x02:
tcp_flags_str += "SYN,"
if hex_value & 0x04:
tcp_flags_str += "RST,"
if hex_value & 0x08:
tcp_flags_str += "PSH,"
if hex_value & 0x10:
tcp_flags_str += "ACK,"
if hex_value & 0x20:
tcp_flags_str += "URG,"
# iptables doesn't handle the flags below now. It has some special keys for it:
# --ecn-tcp-cwr This matches if the TCP ECN CWR (Congestion Window Received) bit is set.
# --ecn-tcp-ece This matches if the TCP ECN ECE (ECN Echo) bit is set.
# if hex_value & 0x40:
# tcp_flags_str += "ECE,"
# if hex_value & 0x80:
# tcp_flags_str += "CWR,"
# Delete the trailing comma
tcp_flags_str = tcp_flags_str[:-1]
return tcp_flags_str
def get_token_ranges(separator_line):
token_ranges = []
start = 0
while start != -1:
end = separator_line.find(' ', start)
if end == -1:
token_ranges.append((start, len(separator_line)))
break
token_ranges.append((start, end))
start = separator_line.find('-', end)
return token_ranges
def get_token_values(line, token_ranges):
token_values = []
for tk_rng in token_ranges:
fld_value = line[tk_rng[0]:tk_rng[1]]
token_values.append(fld_value)
return token_values
def get_cacl_tables_and_rules(duthost):
"""
Gathers control plane ACL tables and rules configured on the device via
`show acl table` and `show acl rule` commands.
Returns a list of dictionaries where each element represents a control
plane ACL table in the following format:
{
"name": "<table name>",
"services": [<list of service names>],
"rules": [<list of rules>]
}
Each rule is itself a dictionary which contains "name", "priority" and
"action" elements, as well as as one or more unique elements which specify
rule data. Examples include "IP_PROTOCOL", "SRC_IP", "SRC_IPV6", "DST_IP",
"DST_IPV6", "L4_SRC_PORT", "L4_DST_PORT", "ETHER_TYPE"
"""
cacl_tables = []
# The output of `show acl table` and `show acl rule` are difficult to parse well :(
# We should consider modifying the output format to make it more easily parsable.
stdout_lines = duthost.shell("show acl table")["stdout_lines"]
previous_table_ctrlplane = False
for line in stdout_lines:
tokens = line.strip().split()
# A line beginning a new ACL table definition should contian at least 4
# columns of data. More recent builds of SONiC output 5 columns (a new
# 'stage' column)
if len(tokens) >= 4:
if tokens[1] == "CTRLPLANE":
# This is the beginning of a new control plane ACL definition
previous_table_ctrlplane = True
cacl_tables.append({"name": tokens[0], "services": [tokens[2]], "rules": []})
else:
previous_table_ctrlplane = False
elif len(tokens) == 1 and previous_table_ctrlplane:
# If the line only contains one token and the previous table we
# encountered was a control plane ACL table, the token in this line
# must be an additional service which the previous table is
# attached to, so we append it to the list of services of the last
# table we added
cacl_tables[-1]["services"].append(tokens[0])
# Process the rules for each table
for table in cacl_tables:
stdout_lines = duthost.shell("show acl rule {}".format(table["name"]))["stdout_lines"]
fld_rngs = get_token_ranges(stdout_lines[1])
# First two lines make up the table header. Get rid of them.
stdout_lines = stdout_lines[2:]
for line in stdout_lines:
tokens = get_token_values(line, fld_rngs)
if len(tokens) == len(fld_rngs) and tokens[0] == table["name"]:
table["rules"].append({"name": tokens[1], "priority": tokens[2].strip(), "action": tokens[3].strip()})
key, val = tokens[4].split()
# Strip the trailing colon from the key name
key = key[:-1]
table["rules"][-1][key] = val
elif len(tokens) == 2:
# If the line only contains two tokens, they must be additional rule data.
# So we add them to the last rule we appended, stripping the trailing colon from the key name
key = tokens[0][:-1]
table["rules"][-1][key] = tokens[1]
else:
pytest.fail("Unexpected ACL rule data: {}".format(repr(tokens)))
# Sort the rules in each table by priority, descending
table["rules"] = sorted(table["rules"], key=lambda k: k["priority"], reverse=True)
return cacl_tables
def generate_and_append_block_ip2me_traffic_rules(duthost, iptables_rules, ip6tables_rules, asic_index):
INTERFACE_TABLE_NAME_LIST = [
"LOOPBACK_INTERFACE",
"VLAN_INTERFACE",
"PORTCHANNEL_INTERFACE",
"INTERFACE"
]
# Gather device configuration facts
namespace = duthost.get_namespace_from_asic_id(asic_index)
cfg_facts = duthost.config_facts(host=duthost.hostname, source="persistent", namespace=namespace)["ansible_facts"]
# Add iptables/ip6tables rules to drop all packets destined for peer-to-peer interface IP addresses
for iface_table_name in INTERFACE_TABLE_NAME_LIST:
if iface_table_name in cfg_facts:
ifaces = cfg_facts[iface_table_name]
for iface_name in ifaces:
for iface_cidr in ifaces[iface_name]:
try:
# There are non-ip_address keys in ifaces. We ignore them with the except
ip_ntwrk = ipaddress.ip_network(iface_cidr, strict=False)
except ValueError:
continue
# For VLAN interfaces, the IP address we want to block is the default gateway (i.e.,
# the first available host IP address of the VLAN subnet)
ip_addr = next(ip_ntwrk.hosts()) if iface_table_name == "VLAN_INTERFACE" \
else ip_ntwrk.network_address
if isinstance(ip_ntwrk, ipaddress.IPv4Network):
iptables_rules.append("-A INPUT -d {}/{} -j DROP".format(ip_addr, ip_ntwrk.max_prefixlen))
elif isinstance(ip_ntwrk, ipaddress.IPv6Network):
ip6tables_rules.append("-A INPUT -d {}/{} -j DROP".format(ip_addr, ip_ntwrk.max_prefixlen))
else:
pytest.fail("Unrecognized IP address type on interface '{}': {}"
.format(iface_name, ip_ntwrk))
def append_midplane_traffic_rules(duthost, iptables_rules):
# Get the kernel intf name in the event that eth1-midplane is an altname
result = duthost.shell('ip link show eth1-midplane | awk \'NR==1{print$2}\' | cut -f1 -d"@" | cut -f1 -d":"',
module_ignore_errors=True)['stdout']
if result:
midplane_ip = duthost.shell('ip -4 -o addr show eth1-midplane | awk \'{print $4}\' | cut -d / -f1 | head -1',
module_ignore_errors=True)['stdout']
iptables_rules.append("-A INPUT -i {} -j ACCEPT".format(result))
iptables_rules.append("-A INPUT -s {}/32 -d {}/32 -j ACCEPT".format(midplane_ip, midplane_ip))
def generate_expected_rules(duthost, tbinfo, docker_network, asic_index, expected_dhcp_rules_for_standby):
iptables_rules = []
ip6tables_rules = []
# Default policies
iptables_rules.append("-P INPUT ACCEPT")
iptables_rules.append("-P FORWARD ACCEPT")
iptables_rules.append("-P OUTPUT ACCEPT")
ip6tables_rules.append("-P INPUT ACCEPT")
ip6tables_rules.append("-P FORWARD ACCEPT")
ip6tables_rules.append("-P OUTPUT ACCEPT")
# Allow localhost
iptables_rules.append("-A INPUT -s 127.0.0.1/32 -i lo -j ACCEPT")
ip6tables_rules.append("-A INPUT -s ::1/128 -i lo -j ACCEPT")
if asic_index is None:
# Allow Communication among docker containers
for k, v in list(docker_network['container'].items()):
# network mode for dhcp_server container is bridge, but this rule is not expected to be seen
if k == "dhcp_server":
continue
iptables_rules.append("-A INPUT -s {}/32 -d {}/32 -j ACCEPT"
.format(docker_network['bridge']['IPv4Address'],
docker_network['bridge']['IPv4Address']))
iptables_rules.append("-A INPUT -s {}/32 -d {}/32 -j ACCEPT"
.format(v['IPv4Address'],
docker_network['bridge']['IPv4Address']))
ip6tables_rules.append("-A INPUT -s {}/128 -d {}/128 -j ACCEPT"
.format(docker_network['bridge']['IPv6Address'],
docker_network['bridge']['IPv6Address']))
ip6tables_rules.append("-A INPUT -s {}/128 -d {}/128 -j ACCEPT"
.format(v['IPv6Address'],
docker_network['bridge']['IPv6Address']))
else:
iptables_rules.append("-A INPUT -s {}/32 -d {}/32 -j ACCEPT".format(docker_network['container']['database'
+ str(asic_index)]['IPv4Address'],
docker_network['container']['database'
+ str(asic_index)]['IPv4Address']))
iptables_rules.append("-A INPUT -s {}/32 -d {}/32 -j ACCEPT".format(docker_network['bridge']['IPv4Address'],
docker_network['container']['database'
+ str(asic_index)]['IPv4Address']))
ip6tables_rules.append("-A INPUT -s {}/128 -d {}/128 -j ACCEPT".format(docker_network['container']['database'
+ str(asic_index)]['IPv6Address'],
docker_network['container']['database'
+ str(asic_index)]['IPv6Address']))
ip6tables_rules.append("-A INPUT -s {}/128 -d {}/128 -j ACCEPT".format(docker_network['bridge']['IPv6Address'],
docker_network['container']['database'
+ str(asic_index)]['IPv6Address']))
# Allow all incoming packets from established connections or new connections
# which are related to established connections
iptables_rules.append("-A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT")
ip6tables_rules.append("-A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT")
# Allow bidirectional ICMPv4 ping and traceroute
iptables_rules.append("-A INPUT -p icmp -m icmp --icmp-type 8 -j ACCEPT")
iptables_rules.append("-A INPUT -p icmp -m icmp --icmp-type 0 -j ACCEPT")
iptables_rules.append("-A INPUT -p icmp -m icmp --icmp-type 3 -j ACCEPT")
iptables_rules.append("-A INPUT -p icmp -m icmp --icmp-type 11 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 128 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 129 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 1 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 3 -j ACCEPT")
# Allow all incoming Neighbor Discovery Protocol (NDP) NS/NA/RS/RA messages
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 135 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 136 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 133 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m icmp6 --icmpv6-type 134 -j ACCEPT")
# Allow all incoming IPv4 DHCP packets
iptables_rules.append("-A INPUT -p udp -m udp --dport 67:68 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p udp -m udp --dport 67:68 -j ACCEPT")
# Allow all incoming IPv6 DHCP packets
iptables_rules.append("-A INPUT -p udp -m udp --dport 546:547 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p udp -m udp --dport 546:547 -j ACCEPT")
# There are some hardcoded cacl rule for dualtor testbed
if "dualtor" in tbinfo['topo']['name']:
rules_to_expect_for_dualtor = [
"-A INPUT -p udp -m udp --dport 67 -j DHCP",
"-A DHCP -j RETURN",
"-N DHCP"
]
iptables_rules.extend(rules_to_expect_for_dualtor)
# On standby tor, it has expected dhcp mark iptables rules.
if expected_dhcp_rules_for_standby:
pytest_assert(isinstance(expected_dhcp_rules_for_standby, list),
"expected_dhcp_rules_for_standby should be list! current type is {}"
.format(type(expected_dhcp_rules_for_standby)))
iptables_rules.extend(expected_dhcp_rules_for_standby)
# Allow all incoming BGP traffic
iptables_rules.append("-A INPUT -p tcp -m tcp --dport 179 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p tcp -m tcp --dport 179 -j ACCEPT")
extra_rule_branches = ['201911', '202012', '202111']
if any(branch in duthost.os_version for branch in extra_rule_branches):
iptables_rules.append("-A INPUT -p tcp -m tcp --sport 179 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p tcp -m tcp --sport 179 -j ACCEPT")
# Allow LDP traffic
if ("wan" in tbinfo['topo']['name']):
wan_default_rules = [
"-A INPUT -p tcp -m tcp --dport 646 -j ACCEPT",
"-A INPUT -p tcp -m tcp --sport 646 -j ACCEPT",
"-A INPUT -p udp -m udp --dport 646 -j ACCEPT",
"-A INPUT -p udp -m udp --sport 646 -j ACCEPT",
"-A INPUT -p tcp -m tcp --sport 179 -j ACCEPT"
]
iptables_rules += wan_default_rules
ip6tables_rules += wan_default_rules
# Generate control plane rules from device config
rules_applied_from_config = 0
cacl_tables = get_cacl_tables_and_rules(duthost)
# Walk the ACL tables and generate an iptables rule for each rule
for table in cacl_tables:
if len(table["rules"]) == 0:
logger.info("ACL table {} has no rules".format(table["name"]))
continue
acl_services = table["services"]
for acl_service in acl_services:
if acl_service not in ACL_SERVICES:
logger.warning("Ignoring control plane ACL '{}' with unrecognized service '{}'"
.format(table["name"], acl_service))
continue
# Obtain default IP protocol(s) and destination port(s) for this service
ip_protocols = ACL_SERVICES[acl_service]["ip_protocols"]
dst_ports = ACL_SERVICES[acl_service]["dst_ports"]
table_ip_version = None
for rule in table["rules"]:
# If we haven't determined the IP version for this ACL table yet,
# try to do it now. We attempt to determine heuristically based on
# whether the src or dst IP of this rule is an IPv4 or IPv6 address.
if not table_ip_version:
if "SRC_IPV6" in rule and rule["SRC_IPV6"]:
table_ip_version = 6
elif "SRC_IP" in rule and rule["SRC_IP"]:
table_ip_version = 4
else:
if (("SRC_IPV6" in rule and rule["SRC_IPV6"] and table_ip_version == 4) or
("SRC_IP" in rule and rule["SRC_IP"] and table_ip_version == 6)):
pytest.fail("ACL table '{}' contains both IPv4 and IPv6 rules".format(table["name"]))
# If we were unable to determine whether this ACL table contains
# IPv4 or IPv6 rules, log a message and skip processing this table.
if not table_ip_version:
pytest.fail("Unable to determine if ACL table '{}' contains IPv4 or IPv6 rules".format(table["name"]))
continue
# We assume the rules are already sorted by priority in descending order
for rule in table["rules"]:
# Apply the rule to the default protocol(s) for this ACL service
for ip_protocol in ip_protocols:
for dst_port in dst_ports:
new_iptables_rule = "-A INPUT"
iface_cidr = None
if table_ip_version == 6 and "SRC_IPV6" in rule and rule["SRC_IPV6"]:
iface_cidr = rule["SRC_IPV6"]
elif table_ip_version == 4 and "SRC_IP" in rule and rule["SRC_IP"]:
iface_cidr = rule["SRC_IP"]
if iface_cidr:
ip_ntwrk = ipaddress.ip_network(iface_cidr, strict=False)
new_iptables_rule += " -s {}/{}".format(ip_ntwrk.network_address, ip_ntwrk.prefixlen)
new_iptables_rule += " -p {0} -m {0} --dport {1}".format(ip_protocol, dst_port)
# If there are TCP flags present and ip protocol is TCP, append them
if ip_protocol == "tcp" and "TCP_FLAGS" in rule and rule["TCP_FLAGS"]:
tcp_flags, tcp_flags_mask = rule["TCP_FLAGS"].split("/")
tcp_flags = int(tcp_flags, 16)
tcp_flags_mask = int(tcp_flags_mask, 16)
if tcp_flags_mask > 0:
new_iptables_rule += (" --tcp-flags {mask} {flags}"
.format(mask=parse_int_to_tcp_flags(tcp_flags_mask),
flags=parse_int_to_tcp_flags(tcp_flags)))
# Append the packet action as the jump target
new_iptables_rule += " -j {}".format(rule["action"])
if table_ip_version == 6:
ip6tables_rules.append(new_iptables_rule)
else:
iptables_rules.append(new_iptables_rule)
rules_applied_from_config += 1
# Append rules which block "ip2me" traffic on p2p interfaces
generate_and_append_block_ip2me_traffic_rules(duthost, iptables_rules, ip6tables_rules, asic_index)
# Allow all packets with a TTL/hop limit of 0 or 1
iptables_rules.append("-A INPUT -p icmp -m ttl --ttl-lt 2 -j ACCEPT")
iptables_rules.append("-A INPUT -p udp -m ttl --ttl-lt 2 -m udp --dport 1025:65535 -j ACCEPT")
iptables_rules.append("-A INPUT -p tcp -m ttl --ttl-lt 2 -m tcp --dport 1025:65535 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p ipv6-icmp -m hl --hl-lt 2 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p udp -m hl --hl-lt 2 -m udp --dport 1025:65535 -j ACCEPT")
ip6tables_rules.append("-A INPUT -p tcp -m hl --hl-lt 2 -m tcp --dport 1025:65535 -j ACCEPT")
# If we have added rules from the device config, we lastly add default drop rules
if rules_applied_from_config > 0:
# Default drop rules
iptables_rules.append("-A INPUT -j DROP")
ip6tables_rules.append("-A INPUT -j DROP")
# IP Table rule to allow eth1-midplane traffic for chassis
if asic_index is None:
append_midplane_traffic_rules(duthost, iptables_rules)
return iptables_rules, ip6tables_rules
def generate_nat_expected_rules(duthost, docker_network, asic_index):
iptables_natrules = []
ip6tables_natrules = []
# Default policies
iptables_natrules.append("-P PREROUTING ACCEPT")
iptables_natrules.append("-P INPUT ACCEPT")
iptables_natrules.append("-P OUTPUT ACCEPT")
iptables_natrules.append("-P POSTROUTING ACCEPT")
ip6tables_natrules.append("-P PREROUTING ACCEPT")
ip6tables_natrules.append("-P INPUT ACCEPT")
ip6tables_natrules.append("-P OUTPUT ACCEPT")
ip6tables_natrules.append("-P POSTROUTING ACCEPT")
for acl_service in ACL_SERVICES:
if ACL_SERVICES[acl_service]["multi_asic_ns_to_host_fwd"]:
for ip_protocol in ACL_SERVICES[acl_service]["ip_protocols"]:
for dst_port in ACL_SERVICES[acl_service]["dst_ports"]:
# IPv4 rules
iptables_natrules.append(
"-A PREROUTING -p {} -m {} --dport {} -j DNAT --to-destination {}".format
(ip_protocol, ip_protocol, dst_port,
docker_network['bridge']['IPv4Address']))
iptables_natrules.append(
"-A POSTROUTING -p {} -m {} --dport {} -j SNAT --to-source {}".format
(ip_protocol, ip_protocol, dst_port,
docker_network['container']['database' + str(asic_index)]['IPv4Address']))
# IPv6 rules
ip6tables_natrules.append(
"-A PREROUTING -p {} -m {} --dport {} -j DNAT --to-destination {}".format
(ip_protocol, ip_protocol, dst_port,
docker_network['bridge']['IPv6Address']))
ip6tables_natrules.append(
"-A POSTROUTING -p {} -m {} --dport {} -j SNAT --to-source {}".format
(ip_protocol, ip_protocol, dst_port,
docker_network['container']['database' + str(asic_index)]['IPv6Address']))
return iptables_natrules, ip6tables_natrules
def generate_expected_cacl_rules(duthost, ip_type):
"""
Generate expected iptables rules for control ACL based on cacl tables and rules.
Args:
duthost: instance of AnsibleHost class
ip_type: ipv4 or ipv6
asic_index: the index of asic
Returns:
None
"""
rules_applied_from_config = 0
iptables_rules = []
cacl_tables = get_cacl_tables_and_rules(duthost)
# Walk the ACL tables and generate an iptables rule for each rule
for table in cacl_tables:
if len(table["rules"]) == 0:
logger.info("ACL table {} has no rules".format(table["name"]))
continue
acl_services = table["services"]
for acl_service in acl_services:
if acl_service not in ACL_SERVICES:
logger.warning("Ignoring control plane ACL '{}' with unrecognized service '{}'"
.format(table["name"], acl_service))
continue
# Obtain default IP protocol(s) and destination port(s) for this service
ip_protocols = ACL_SERVICES[acl_service]["ip_protocols"]
dst_ports = ACL_SERVICES[acl_service]["dst_ports"]
# We assume the rules are already sorted by priority in descending order
for rule in table["rules"]:
# Apply the rule to the default protocol(s) for this ACL service
for ip_protocol in ip_protocols:
for dst_port in dst_ports:
new_iptables_rule = "-A INPUT"
iface_cidr = None
if ip_type == "ipv6" and "SRC_IPV6" in rule and rule["SRC_IPV6"]:
iface_cidr = rule["SRC_IPV6"]
elif ip_type == "ipv4" and "SRC_IP" in rule and rule["SRC_IP"]:
iface_cidr = rule["SRC_IP"]
if iface_cidr and iface_cidr != "0.0.0.0/0" and iface_cidr != "::/0":
ip_ntwrk = ipaddress.ip_network(iface_cidr, strict=False)
new_iptables_rule += " -s {}/{}".format(ip_ntwrk.network_address, ip_ntwrk.prefixlen)
new_iptables_rule += " -p {0} -m {0} --dport {1}".format(ip_protocol, dst_port)
# If there are TCP flags present and ip protocol is TCP, append them
if ip_protocol == "tcp" and "TCP_FLAGS" in rule and rule["TCP_FLAGS"]:
tcp_flags, tcp_flags_mask = rule["TCP_FLAGS"].split("/")
tcp_flags = int(tcp_flags, 16)
tcp_flags_mask = int(tcp_flags_mask, 16)
if tcp_flags_mask > 0:
new_iptables_rule += (" --tcp-flags {mask} {flags}"
.format(mask=parse_int_to_tcp_flags(tcp_flags_mask),
flags=parse_int_to_tcp_flags(tcp_flags)))
# Append the packet action as the jump target
new_iptables_rule += " -j {}".format(rule["action"])
iptables_rules.append(new_iptables_rule)
rules_applied_from_config += 1
# If we have added rules from the device config, we lastly add default drop rules
if rules_applied_from_config > 0:
# Default drop rules
iptables_rules.append("-A INPUT -j DROP")
return iptables_rules
def generate_scale_rules(duthost, ip_type):
"""
Generate scale rules for SNMP-ACL, ssh-only, NTP-ACL tables with template json file
Args:
duthost: instance of AnsibleHost class
ip_type: ipv4 or ipv6
asic_index: the index of asic
Returns:
None
"""
rules_data = {}
snmp_acl_entry = {}
ssh_acl_entry = {}
ntp_acl_entry = {}
for index in range(1, 51):
if ip_type == "ipv4":
src_ip = "20.0.0." + str(1 + index) + "/32"
else:
src_ip = "2001::" + str(1 + index) + "/128"
acl_entry = {}
acl_entry[index] = {
"actions": {
"config": {
"forwarding-action": "DROP"
}
},
"config": {
"sequence-id": index
},
"ip": {
"config": {
"source-ip-address": src_ip
}
}
}
snmp_acl_entry.update(acl_entry)
ssh_acl_entry.update(acl_entry)
ntp_acl_entry.update(acl_entry)
rules_data['acl'] = {
"acl-sets": {
"acl-set": {
"SNMP-ACL": {
"acl-entries": {
"acl-entry": snmp_acl_entry
},
"config": {
"name": "SNMP-ACL"
}
},
"ssh-only": {
"acl-entries": {
"acl-entry": ssh_acl_entry
},
"config": {
"name": "ssh-only"
}
},
"ntp-acl": {
"acl-entries": {
"acl-entry": ntp_acl_entry
},
"config": {
"name": "ntp-acl"
}
}
}
}
}
duthost.copy(content=json.dumps(rules_data, indent=4), dest=SCALE_ACL_FILE)
cmds = 'acl-loader update full {}'.format(SCALE_ACL_FILE)
duthost.command(cmds)
logger.info('Waiting all rules to be applied')
# "acl-loader update full **.json" command will refresh iptables, we have to
# add the ACCEPT SSH iptables rule after acl-loader command. But on multi-asic
# testbed, it always costs minutes to sync iptables rules after updating cacl
# rules, if sleep for more than 3 mins with time.sleep, then, the process tries to
# add the ACCEPT SSH rule, at this point, SSH connection is disconnected now
# because the default SSH timeout is 30s, next duthost.command will try to reconnect
# to DUT, it will trigger ssh login, it will be rejected by default CACL DENY rule,
# test will fail. We have wait_until to solve this problem, here add wail_until
# to call check_iptable_rules every 10s to keep ssh session alive, it just calls
# duthost.command to active ssh connection.
# In this way, we can active ssh connection and wait as long as we want.
# It has to wait cacl rules to be effective.
wait_until(200, 10, 2, check_iptable_rules, duthost)
# add ACCEPT rule for SSH to make sure testbed access
duthost.command("iptables -I INPUT 3 -p tcp -m tcp --dport 22 -j ACCEPT")
def verify_cacl_show_acl_rule(duthost, acl_file):
"""
Converts the CACL rules in the provided acl_file into a dict,
and verifies that they match the applied acl rules through the `show acl
rule` command.
"""
acl_json = duthost.command(f"cat {acl_file}")["stdout_lines"]
acl_rules_expected_dict = json.loads("".join(acl_json))
acl_sets_extracted = acl_rules_expected_dict["acl"].get("acl-sets", {}).get("acl-set", {})
acl_rules_expected = {}
# Reconstruct the dict such that we can index easily using the actual rules
for acl_table, acl_table_conf in acl_sets_extracted.items():
# Reformat name as produced by acl-loader
acl_table_name = acl_table.upper().replace("-", "_")
acl_table_rules = {}
for acl_entry_num, acl_entry_conf in acl_table_conf['acl-entries']['acl-entry'].items():
acl_entry_name = f"RULE_{acl_entry_num}"
acl_table_rules[acl_entry_name] = acl_entry_conf
acl_rules_expected[acl_table_name] = acl_table_rules
logger.debug(f"Expected rules: {acl_rules_expected}")
acl_rules_actual_list = get_cacl_tables_and_rules(duthost)
acl_rules_actual = {}
# Reconstruct the list of actual rules for easy indexation
for acl_table in acl_rules_actual_list:
acl_table_name = acl_table["name"].strip()
acl_table_rules = {}
for acl_rule in acl_table["rules"]:
acl_rule_name = acl_rule["name"].strip()
acl_table_rules[acl_rule_name] = acl_rule
acl_rules_actual[acl_table_name] = acl_table_rules
logger.debug(f"Actual rules: {acl_rules_actual}")
incorrect_rules = []
missing_rules = []
# Check that each table expected exactly matches actual
table_set_actual = set(acl_rules_actual.keys())
table_set_expected = set(acl_rules_expected.keys())
missing_tables_expected = table_set_actual - table_set_expected
missing_tables_actual = table_set_expected - table_set_actual
pytest_assert(
len(missing_tables_expected) == 0 and len(missing_tables_actual) == 0,