From 587dc60caf2fb04546fa6d7bdb341362417ed592 Mon Sep 17 00:00:00 2001 From: Christiano Haesbaert Date: Tue, 12 Nov 2024 09:14:03 +0100 Subject: [PATCH] [packetbeat] Expire source port mappings. (#41581) port->pid mappings were only overwritten, never expired, the overwriting mechanism has a bunch of issues: - It only overwrites if it manages to find the new pid, so it misses short lived processes. - It only refreshes the mapping of said port, if a packet arriving on _another_ port misses the lookup (otherwise the original port is found and returned). Meaning, once all ports are used at least once, the cache is filled and never mutated again. The observable effect is that the user will see wrong process correlations _to_ older/long lived processes, imagine the follwing: - Long lived process makes _short_ lived TCP connection from src_port S. - Years later, a _short_ lived process makes a TCP connection to somewhere else, but from the same src_port S. It hits the cache, since it had a mapping for S, so packetbeat incorrectly correlates the new short-lived process connection, with the old long lived process. Related to a very long SDH, where a more in depth explanation of the bug can be found here, with a program to reproduce it. - https://github.com/elastic/sdh-beats/issues/4604#issuecomment-2459969325 - https://github.com/elastic/sdh-beats/issues/4604#issuecomment-2460829030 The solution is to discard mappings that are "old enough", with a hardcoded window of 10 seconds, so as long as the port is not re-used in this window, we are fine. This also makes sure the cache never becomes "immutable", since mappings will invariably get old, forcing a refresh. It's a very conservative approach as I don't want to introduce other bugs by redesigning it, work is on the way to change how the cache works in linux anyway. While here, I've noticed the locking was also wrong, we were doing the lookup unlocked, and also having to relock in case we have to update the mapping, so change this to grab the lock once and only once, interleaving is baad. --- CHANGELOG.next.asciidoc | 1 + packetbeat/procs/procs.go | 64 ++++++++++++++++++++++++++++++--------- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9d3a3a92c72..e2a07b3fe3a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Packetbeat* +- Expire source port mappings. {pull}41581[41581] *Winlogbeat* diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index 11b9fc2e8d8..f578b49a5d4 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -64,6 +64,7 @@ type portProcMapping struct { endpoint endpoint // FIXME: This is never used. pid int proc *process + expires time.Time } // process describes an OS process. @@ -185,8 +186,8 @@ func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool { func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process { proc.mu.Lock() + defer proc.mu.Unlock() procMap, ok := proc.portProcMap[transport] - proc.mu.Unlock() if !ok { return nil } @@ -206,24 +207,47 @@ func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport ap return nil } -func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) { +// proc.mu must be locked +func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (portProcMapping, bool) { + now := time.Now() + key := endpoint{address.String(), port} + p, found := procMap[key] + // Precedence when one socket is bound to a specific IP:port and another one // to INADDR_ANY and same port is not clear. Seems that the last one to bind // takes precedence, and we don't have a way to tell. // This function takes the naive approach of giving precedence to the more // specific address and then to INADDR_ANY. - if p, found = procMap[endpoint{address.String(), port}]; found { - return p, found + if !found { + if address.To4() != nil { + key.address = anyIPv4 + } else { + key.address = anyIPv6 + } + + p, found = procMap[key] } - nullAddr := anyIPv4 - if asIPv4 := address.To4(); asIPv4 == nil { - nullAddr = anyIPv6 + // We can't guarantee `p` doesn't point to an old entry, since + // we never remove entries from `procMap`, we only overwrite + // them, but we only overwrite them once an unrelated packet + // that doesn't have an entry on `procMap` ends up rebuilding + // the whole map. + // + // We take a conservative approach by discarding the entry if + // it's old enough. When we fail the first time here, our caller + // updates all maps and calls us again. + if found && now.After(p.expires) { + logp.Debug("procs", "PID %d (%s) port %d is too old, discarding", p.pid, p.proc.name, port) + delete(procMap, key) + p = portProcMapping{} + found = false } - p, found = procMap[endpoint{nullAddr, port}] + return p, found } +// proc.mu must be locked func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { if logp.HasSelector("procsdetailed") { start := time.Now() @@ -244,6 +268,7 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { } } +// proc.mu must be locked func (proc *ProcessesWatcher) expireProcessCache() { now := time.Now() for pid, info := range proc.processCache { @@ -253,9 +278,8 @@ func (proc *ProcessesWatcher) expireProcessCache() { } } +// proc.mu must be locked func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) { - proc.mu.Lock() - defer proc.mu.Unlock() prev, ok := proc.portProcMap[transport][e] if ok && prev.pid == pid { // This port->pid mapping already exists @@ -267,11 +291,21 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e return } - // Simply overwrite old entries for now. - // We never expire entries from this map. Since there are 65k possible - // ports, the size of the dict can be max 1.5 MB, which we consider - // reasonable. - proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p} + // We overwrite previous entries here, and they expire in + // lookupMapping() if they are deemed old enough. + // + // Map size is bound by the number of ports: ~65k, so it's + // fine to have old entries lingering, as long as we don't + // trust them on subsequent connections. + // + // If the source port is re-used within the hardcoded 10 + // seconds window, we might end up hitting an old mapping. + proc.portProcMap[transport][e] = portProcMapping{ + endpoint: e, + pid: pid, + proc: p, + expires: time.Now().Add(10 * time.Second), + } if logp.IsDebug("procsdetailed") { logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s'",