Skip to content

Commit

Permalink
Combined IPV4 and IPV6 peer state reading, use specific Exception and…
Browse files Browse the repository at this point in the history
… error code, use Redis pipeline for stateDB operations
  • Loading branch information
gechiang committed Sep 16, 2020
1 parent 7274983 commit 85dce8e
Showing 1 changed file with 67 additions and 31 deletions.
98 changes: 67 additions & 31 deletions fpmsyncd/bgpmon.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
is a need to perform update or the peer is stale to be removed from the
state DB
"""

import commands
import json
import os
Expand All @@ -32,19 +31,23 @@
import time
import traceback

PIPE_BATCH_MAX_COUNT = 50

class BgpStateGet():
def __init__(self):
# list peer_l stores the Neighbor peer Ip address
# dic peer_state stores the Neighbor peer state entries
# list new_peer_l stores the new snapshot of Neighbor peer ip address
# dic new_peer_state stores the new snapshot of Neighbor peer states
# dic new_peer_state stores the new snapshot of Neighbor peer states
self.peer_l = []
self.peer_state = {}
self.new_peer_l = []
self.new_peer_state = {}
self.cached_timestamp = 0
self.db = swsssdk.SonicV2Connector()
self.db.connect(self.db.STATE_DB, False)
client = self.db.get_redis_client(self.db.STATE_DB)
self.pipe = client.pipeline()
self.db.delete_all_by_pattern(self.db.STATE_DB, "NEIGH_STATE_TABLE|*" )

# A quick way to check if there are anything happening within BGP is to
Expand All @@ -60,37 +63,56 @@ def bgp_activity_detected(self):
return True
else:
return False
except Exception, e:
except (IOError, OSError):
return True

def update_new_peer_states(self, peer_dict):
peer_l = peer_dict["peers"].keys()
self.new_peer_l.extend(peer_l)
for i in range (0, len(peer_l)):
self.new_peer_state[peer_l[i]] = peer_dict["peers"][peer_l[i]]["state"]

# Get a new snapshot of BGP neighbors and store them in the "new" location
def get_all_neigh_states(self):
ipv6_peer_l = []
try:
cmd = "vtysh -c 'show bgp summary json'"
output = commands.getoutput(cmd)
peer_info = json.loads(output)
# no exception, safe to Clean the "new" lists/dic for new sanpshot
del self.new_peer_l[:]
self.new_peer_state.clear()
if "ipv4Unicast" in peer_info and "peers" in peer_info["ipv4Unicast"]:
self.new_peer_l = peer_info["ipv4Unicast"]["peers"].keys()
for i in range (0, len(self.new_peer_l)):
self.new_peer_state[self.new_peer_l[i]] = \
peer_info["ipv4Unicast"]["peers"][self.new_peer_l[i]]["state"]

if "ipv6Unicast" in peer_info and "peers" in peer_info["ipv6Unicast"]:
ipv6_peer_l = peer_info["ipv6Unicast"]["peers"].keys()
self.new_peer_l.extend(ipv6_peer_l)
for i in range (0, len(ipv6_peer_l)):
self.new_peer_state[ipv6_peer_l[i]] = \
peer_info["ipv6Unicast"]["peers"][ipv6_peer_l[i]]["state"]

except Exception:
syslog.syslog(syslog.LOG_ERR, "*ERROR* get_all_neigh_states Exception: %s"
% (traceback.format_exc()))
cmd = "vtysh -c 'show bgp summary json'"
rc, output = commands.getstatusoutput(cmd)
if rc:
syslog.syslog(syslog.LOG_ERR, "*ERROR* Failed with rc:{} when execute: {}".format(rc, cmd))
return

peer_info = json.loads(output)
# cmd ran successfully, safe to Clean the "new" lists/dic for new sanpshot
del self.new_peer_l[:]
self.new_peer_state.clear()
for key, value in peer_info.items():
if key == "ipv4Unicast" or key == "ipv6Unicast":
self.update_new_peer_states(value)

def flush_pipe(self, data):
"""Dump each entry in data{} into State DB via redis pipeline.
Args:
data: Neighbor state in dictionary format
{
'NEIGH_STATE_TABLE|ip_address_a': {'state':state},
'NEIGH_STATE_TABLE|ip_address_b': {'state':state},
'NEIGH_STATE_TABLE|ip_address_c': {'state':state},
'NEIGH_STATE_TABLE|ip_address_x': None,
'NEIGH_STATE_TABLE|ip_address_z': None
...
}
"""
for key, value in data.items():
if value is None:
# delete case
self.pipe.delete(key)
else:
# Add or Modify case
self.pipe.hmset(key, value)
self.pipe.execute()

def update_neigh_states(self):
data = {}
pipe_count = 0
for i in range (0, len(self.new_peer_l)):
peer = self.new_peer_l[i]
key = "NEIGH_STATE_TABLE|%s" % peer
Expand All @@ -99,22 +121,36 @@ def update_neigh_states(self):
if self.peer_state[peer] != self.new_peer_state[peer]:
# state changed. Update state DB for this entry
state = self.new_peer_state[peer]
self.db.set(self.db.STATE_DB, key, 'state', state)
data[key] = {'state':state}
pipe_count += 1
self.peer_state[peer] = state
# remove this neighbor from old list since it is accounted for
self.peer_l.remove(peer)
else:
# New neighbor found case. Add to dictionary and state DB
state = self.new_peer_state[peer]
self.db.set(self.db.STATE_DB, key, 'state', state)
data[key] = {'state':state}
pipe_count += 1
self.peer_state[peer] = state
if pipe_count > PIPE_BATCH_MAX_COUNT:
self.flush_pipe(data)
data = {}
pipe_count = 0
# Check for stale state entries to be cleaned up
while len(self.peer_l) > 0:
# remove this from the stateDB and the current nighbor state entry
peer = self.peer_l.pop(0)
del_key = "NEIGH_STATE_TABLE|%s" % peer
self.db.delete(self.db.STATE_DB, del_key)
data[del_key] = None
pipe_count += 1
del self.peer_state[peer]
if pipe_count > PIPE_BATCH_MAX_COUNT:
self.flush_pipe(data)
data = {}
pipe_count = 0
# If anything in the pipeline not yet flushed, flush them now
if pipe_count > 0:
self.flush_pipe(data)
# Save the new List
self.peer_l = self.new_peer_l[:]

Expand All @@ -124,7 +160,7 @@ def main():

try:
bgp_state_get = BgpStateGet()
except Exception, e:
except Exception as e:
syslog.syslog(syslog.LOG_ERR, "{}: error exit 1, reason {}".format(THIS_MODULE, str(e)))
exit(1)

Expand Down

0 comments on commit 85dce8e

Please sign in to comment.