Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(discussion) Build AH without SecurityAssociation #4227

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions scapy/layers/inet.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ def post_build(self, p, pay):
elif conf.ipv6_enabled and isinstance(self.underlayer, scapy.layers.inet6.IPv6) or isinstance(self.underlayer, scapy.layers.inet6._IPv6ExtHdr): # noqa: E501
ck = scapy.layers.inet6.in6_chksum(socket.IPPROTO_TCP, self.underlayer, p) # noqa: E501
p = p[:16] + struct.pack("!H", ck) + p[18:]
elif self.underlayer and self.underlayer.name == "AH" and isinstance(self.underlayer.underlayer, IP): # noqa: E501
ck = in4_chksum(socket.IPPROTO_TCP, self.underlayer.underlayer, p)
p = p[:16] + struct.pack("!H", ck) + p[18:]
elif self.underlayer and self.underlayer.name == "AH" and ((conf.ipv6_enabled and isinstance(self.underlayer.underlayer, scapy.layers.inet6.IPv6)) or isinstance(self.underlayer.underlayer, scapy.layers.inet6._IPv6ExtHdr)): # noqa: E501
ck = scapy.layers.inet6.in6_chksum(socket.IPPROTO_TCP, self.underlayer.underlayer, p) # noqa: E501
p = p[:16] + struct.pack("!H", ck) + p[18:]
else:
log_runtime.info(
"No IP underlayer to compute checksum. Leaving null."
Expand Down Expand Up @@ -833,6 +839,18 @@ def post_build(self, p, pay):
if ck == 0:
ck = 0xFFFF
p = p[:6] + struct.pack("!H", ck) + p[8:]
elif self.underlayer and self.underlayer.name == "AH" and isinstance(self.underlayer.underlayer, IP): # noqa: E501
ck = in4_chksum(socket.IPPROTO_UDP, self.underlayer.underlayer, p)
# According to RFC768 if the result checksum is 0, it should be set to 0xFFFF # noqa: E501
if ck == 0:
ck = 0xFFFF
p = p[:6] + struct.pack("!H", ck) + p[8:]
elif self.underlayer and self.underlayer.name == "AH" and (isinstance(self.underlayer.underlayer, scapy.layers.inet6.IPv6) or isinstance(self.underlayer.underlayer, scapy.layers.inet6._IPv6ExtHdr)): # noqa: E501
ck = scapy.layers.inet6.in6_chksum(socket.IPPROTO_UDP, self.underlayer.underlayer, p) # noqa: E501
# According to RFC2460 if the result checksum is 0, it should be set to 0xFFFF # noqa: E501
if ck == 0:
ck = 0xFFFF
p = p[:6] + struct.pack("!H", ck) + p[8:]
else:
log_runtime.info(
"No IP underlayer to compute checksum. Leaving null."
Expand Down
77 changes: 77 additions & 0 deletions test/scapy/layers/inet.uts
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,29 @@ assert pkt.options[0][1] == (b'\xe3\xa0,\xdc\xe4\xae\x87\x18\xad{\xab\xd0b\x12\x
assert TCP(bytes(pkt)).options[0][0] == "MD5"

= IP, TCP & UDP checksums (these tests highly depend on default values)

def transferPacket(pkt):
packet = hexdump(pkt, dump=True).split('\n')
packet = list(map(lambda x: x.split(" ")[1], packet))
return " ".join(packet).strip().split(" ")

pkt = IP() / TCP()
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7ccd and bpkt.payload.chksum == 0x917c

Ah_dicts = {'nh': 6, 'payloadlen': 2, 'reserved': 0, 'spi': 1, 'seq': 0, 'icv': b'\x00\x00\x00\x00'}
pkt = IP(proto=51) / AH(**Ah_dicts) / TCP()
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7c90
packetStr = transferPacket(pkt)
tcpCheckSum = int("".join(packetStr[-4:-2]), 16)
assert tcpCheckSum == 0x917c

pkt = IPv6(nh=51) / AH(**Ah_dicts) / TCP()
packetStr = transferPacket(pkt)
tcpCheckSum = int("".join(packetStr[-4:-2]), 16)
assert tcpCheckSum == 0x8f7d

pkt = IP(len=40) / TCP()
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7ccd and bpkt.payload.chksum == 0x917c
Expand All @@ -412,6 +431,18 @@ pkt = IP() / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7cc3 and bpkt.payload.chksum == 0x4b2c

pkt = IP(proto=51) / AH(**Ah_dicts) / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7c86
packetStr = transferPacket(pkt)
tcpCheckSum = int("".join(packetStr[-14:-12]), 16)
assert tcpCheckSum == 0x4b2c

pkt = IPv6(nh=51) / AH(**Ah_dicts) / TCP() / ("A" * 10)
packetStr = transferPacket(pkt)
tcpCheckSum = int("".join(packetStr[-14:-12]), 16)
assert tcpCheckSum == 0x492d

pkt = IP(len=50) / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7cc3 and bpkt.payload.chksum == 0x4b2c
Expand All @@ -424,6 +455,13 @@ pkt = IP(options=[IPOption_RR()]) / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x70bc and bpkt.payload.chksum == 0x4b2c

pkt = IP(proto=51, options=[IPOption_RR()]) / AH(**Ah_dicts) / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x707f
packetStr = transferPacket(pkt)
tcpCheckSum = int("".join(packetStr[-14:-12]), 16)
assert tcpCheckSum == 0x4b2c

pkt = IP(len=54, options=[IPOption_RR()]) / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x70bc and bpkt.payload.chksum == 0x4b2c
Expand All @@ -436,10 +474,30 @@ pkt = IP(options=[IPOption_Timestamp()]) / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x2caa and bpkt.payload.chksum == 0x4b2c

pkt = IP(proto=51, options=[IPOption_Timestamp()]) / AH(**Ah_dicts) / TCP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x2c6d
packetStr = transferPacket(pkt)
tcpCheckSum = int("".join(packetStr[-14:-12]), 16)
assert tcpCheckSum == 0x4b2c

pkt = IP() / UDP()
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7cce and bpkt.payload.chksum == 0x0172

Ah_dicts = {'nh': 17, 'payloadlen': 2, 'reserved': 0, 'spi': 1, 'seq': 0, 'icv': b'\x00\x00\x00\x00'}
pkt = IP(proto=51) / AH(**Ah_dicts) / UDP()
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7c9c
packetStr = transferPacket(pkt)
udpCheckSum = int("".join(packetStr[-2:]), 16)
assert udpCheckSum == 0x0172

pkt = IPv6(nh=51) / AH(**Ah_dicts) / UDP()
packetStr = transferPacket(pkt)
udpCheckSum = int("".join(packetStr[-2:]), 16)
assert udpCheckSum == 0xff72

pkt = IP(len=28) / UDP()
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7cce and bpkt.payload.chksum == 0x0172
Expand All @@ -455,6 +513,18 @@ pkt = IP() / UDP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7cc4 and bpkt.payload.chksum == 0xbb17

pkt = IP(proto=51) / AH(**Ah_dicts) / UDP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7c92
packetStr = transferPacket(pkt)
udpCheckSum = int("".join(packetStr[-12:-10]), 16)
assert udpCheckSum == 0xbb17

pkt = IPv6(nh=51) / AH(**Ah_dicts) / UDP() / ("A" * 10)
packetStr = transferPacket(pkt)
udpCheckSum = int("".join(packetStr[-12:-10]), 16)
assert udpCheckSum == 0xb918

pkt = IP(len=38) / UDP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x7cc4 and bpkt.payload.chksum == 0xbb17
Expand All @@ -467,6 +537,13 @@ pkt = IP(options=[IPOption_RR()]) / UDP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x70bd and bpkt.payload.chksum == 0xbb17

pkt = IP(proto=51, options=[IPOption_RR()]) / AH(**Ah_dicts) / UDP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x708b
packetStr = transferPacket(pkt)
udpCheckSum = int("".join(packetStr[-12:-10]), 16)
assert udpCheckSum == 0xbb17

pkt = IP(len=42, options=[IPOption_RR()]) / UDP() / ("A" * 10)
bpkt = IP(raw(pkt))
assert bpkt.chksum == 0x70bd and bpkt.payload.chksum == 0xbb17
Expand Down