Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix stalled connection that should break after rogue NAK/ACK reception #3105

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions scripts/srt-proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#!/usr/bin/python3

# This script is a testing tool that inserts a packet interceptor for
# a bond between two local ports and modify a packet that is sent over
# the UDP link used by the SRT connection.

# Provided by:
# https://github.com/FelixSodermanNeti

import socket, select, argparse

class UDPProxy:
def __init__(self, caller_host, caller_port, listener_host, listener_port, break_at_pkt_NAK, break_at_pkt_ACK):

# Listening socket configuration
self.caller_host = caller_host
self.caller_port = caller_port
self.caller_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.caller_socket.bind((self.caller_host, self.caller_port))

# Forwarding destination configuration
self.listener_host = listener_host
self.listener_port = listener_port

# Client address tracking
self.client_address = None

# Proxy state
self.running = False

# Break SRT
self.sentPacketCounter = 0
self.recivedAckCounter = 0
self.PACKETBREAK_NAK = break_at_pkt_NAK
self.PACKETBREAK_ACK = break_at_pkt_ACK

def start(self):
try:
self.running = True
print(
f"UDP Proxy started. Listening on {self.caller_host}:{self.caller_port}. "
f"Forwarding to {self.listener_host}:{self.listener_port}"
)

# Create forward socket
listener_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# Use select for non-blocking I/O
while self.running:
readable, _, _ = select.select([self.caller_socket, listener_socket], [], [], 1)

for sock in readable:
if sock == self.caller_socket:
# Receive from client
data, addr = sock.recvfrom(65535)
self.client_address = addr

#Modify the NAK
processedData = data
# NAK
if(data[1] == 0x03 and len(data) == 24 and self.PACKETBREAK_NAK != 0): # IS A NAK, This should check for a 1 in the first byte aswell.
print("Modifying NAK.")
newBytes = b"\xff\xff\xff\xff"
processedData = data[:20] + newBytes + data[20 + len(newBytes):]
"""
print("NAK BEFORE:")
print(string_bytearray_in_rows(data=data))
print("\n\nModifying NAK.\n\n")
print("ŃAK AFTER:")
print(string_bytearray_in_rows(data=processedData))
"""

# ACK
if(data[1] == 0x02 and len(data) == 44): # IS A ACK
self.recivedAckCounter += 1
if (self.PACKETBREAK_ACK != 0 and self.recivedAckCounter == self.PACKETBREAK_ACK):
print("Modifying ACK.")
newBytes = b"\x7f\xff\xff\xff"
offset = 16
processedData = data[:offset] + newBytes + data[offset + len(newBytes):]
"""
print("ACK BEFORE:")
print(string_bytearray_in_rows(data=data))
print("\n\nModifying ACK.\n\n")
print("ACK AFTER:")
print(string_bytearray_in_rows(data=processedData))
"""
listener_socket.sendto(
processedData,
(self.listener_host, self.listener_port)
)

elif sock == listener_socket:
# Receive from server
data, addr = sock.recvfrom(65535)

# Drop a sequence of data packets (as to atleast 3 to trigger an NAK with a range)
if len(data) == 1332: # Packet with data
self.sentPacketCounter += 1
if (self.PACKETBREAK_NAK != 0 and self.sentPacketCounter in range(self.PACKETBREAK_NAK, self.PACKETBREAK_NAK+5)):
print("Discarding packet.")
continue

if not self.client_address:
print("No client to send data to!")
continue

self.caller_socket.sendto(
data,
self.client_address
)

except Exception as e:
print(f"Proxy error: {e}")

finally:
self.stop()

def stop(self):
self.running = False
self.caller_socket.close()
print("UDP Proxy stopped.")


def string_bytearray_in_rows(data, bytes_per_row=4):
tmpString = "\n"
for i in range(0, len(data), bytes_per_row):
row = data[i:i+bytes_per_row]
tmpString += f"{i}: "
tmpString += (" ".join(f"{byte:02x}" for byte in row))
tmpString += "\n"
tmpString += "\n"
return tmpString

def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description='UDP Proxy')
parser.add_argument('--caller-host', default='127.0.0.1',
help='Host to listen on (srt caller) (default: 127.0.0.1)')
parser.add_argument('--caller-port', type=int, required=True,
help='Port to listen on (srt caller)')
parser.add_argument('--listener-host', default='127.0.0.1',
help='Destination host to forward packets (srt listener)')
parser.add_argument('--listener-port', type=int, required=True,
help='Destination port to forward packets (srt listener)')
parser.add_argument('--break-at-pkt-NAK', type=int, default=0,
help='At what datapacket should the NAK -1 be sent (0=never)')
parser.add_argument('--break-at-pkt-ACK', type=int, default=0,
help='At what datapacket should the ACK -1 be sent (0=never)')
args = parser.parse_args()

# Create and start proxy
proxy = UDPProxy(
args.caller_host,
args.caller_port,
args.listener_host,
args.listener_port,
args.break_at_pkt_NAK,
args.break_at_pkt_ACK
)

try:
proxy.start()
except KeyboardInterrupt:
proxy.stop()

if __name__ == '__main__':
main()
44 changes: 26 additions & 18 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8483,39 +8483,44 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_

// Protect packet retransmission
{
ScopedLock ack_lock(m_RecvAckLock);
UniqueLock ack_lock(m_RecvAckLock);

// Check the validation of the ack
if (CSeqNo::seqcmp(ackdata_seqno, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
{
ack_lock.unlock();

// this should not happen: attack or bug
LOGC(gglog.Error,
log << CONID() << "ATTACK/IPE: incoming ack seq " << ackdata_seqno << " exceeds current "
<< m_iSndCurrSeqNo << " by " << (CSeqNo::seqoff(m_iSndCurrSeqNo, ackdata_seqno) - 1) << "!");
m_bBroken = true;
m_iBrokenCounter = 0;

updateBrokenConnection();
completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS!
return;
}

if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
const int cwnd1 = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
const bool bWasStuck = cwnd1<= getFlightSpan();
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK

const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
if (bWasStuck && cwnd > getFlightSpan())
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
HLOGC(gglog.Debug,
log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize
<< " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno);
const int cwnd1 = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
const bool bWasStuck = cwnd1<= getFlightSpan();
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK

const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
if (bWasStuck && cwnd > getFlightSpan())
{
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
HLOGC(gglog.Debug,
log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize
<< " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno);
}
}
}

/*
* We must not ignore full ack received by peer
Expand Down Expand Up @@ -8924,6 +8929,9 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
// this should not happen: attack or bug
m_bBroken = true;
m_iBrokenCounter = 0;

updateBrokenConnection();
completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS!
return;
}

Expand Down
Loading