Skip to content

Commit

Permalink
ns-report: fix dpireport crash (#851)
Browse files Browse the repository at this point in the history
Changes:

- restart threads if netifyd socket disappears
- wait for netifyd socket indefinitely
- log socket connection and disconnection to ease debug

#809
  • Loading branch information
gsanchietti authored Oct 16, 2024
1 parent 59ad1ee commit 34a6166
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions packages/ns-report/files/dpireport
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,24 @@ def consolidate():
except:
stats[client][key].setdefault(name, flow["total_bytes"])

def wait_socket(socket_file):
socket_file = os.environ.get('NETIFYD_SOCKET', '/var/run/netifyd/netifyd.sock')
sleep_time = 1
while not os.path.exists(socket_file):
logging.info(f'Netifyd socket {socket_file} not found. Waiting {sleep_time} seconds')
time.sleep(sleep_time)
sleep_time = min(sleep_time * 2, 60) # Exponential backoff, max 1 minute

# Read from netify socket and filter data stream
def reader():
global flows_locks
global flows

# wait 60 seconds for netifyd to start
timeout = 60
socket_file = os.environ.get('NETIFYD_SOCKET', '/var/run/netifyd/netifyd.sock')
while not os.path.exists(socket_file) and timeout > 0:
time.sleep(1)
timeout -= 1
if timeout == 0:
logging.error(f'Netifyd socket {socket_file} not found')
sys.exit(1)
wait_socket(socket_file)

client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(socket_file)
logging.debug(f'Connected to socket')
logging.info(f'Connected to socket')
for line in client.makefile('r'):
if not run:
break
Expand Down Expand Up @@ -188,6 +188,15 @@ def reader():
flows[data['flow']['digest']]['total_bytes'] = data['flow'].get('total_bytes', -1)
flows[data['flow']['digest']]['timestamp'] = data['flow'].get('last_seen_at', -1)
flows_q.put(flows.pop(data['flow']['digest']))
else:
logging.error(f'Connection to netifyd socket {socket_file} closed')
try:
client.close()
stop_threads()
except:
pass
time.sleep(1)
start_threads()

def stop_threads():
global run
Expand Down

0 comments on commit 34a6166

Please sign in to comment.