From 85dce8e01683a385e87d0d5c7e0dd15044802b47 Mon Sep 17 00:00:00 2001 From: Gen-Hwa Chiang Date: Wed, 16 Sep 2020 06:53:34 +0000 Subject: [PATCH] Combined IPV4 and IPV6 peer state reading, use specific Exception and error code, use Redis pipeline for stateDB operations --- fpmsyncd/bgpmon.py | 98 +++++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 31 deletions(-) diff --git a/fpmsyncd/bgpmon.py b/fpmsyncd/bgpmon.py index e9bb8c6081..26acbbabc5 100644 --- a/fpmsyncd/bgpmon.py +++ b/fpmsyncd/bgpmon.py @@ -23,7 +23,6 @@ is a need to perform update or the peer is stale to be removed from the state DB """ - import commands import json import os @@ -32,12 +31,14 @@ import time import traceback +PIPE_BATCH_MAX_COUNT = 50 + class BgpStateGet(): def __init__(self): # list peer_l stores the Neighbor peer Ip address # dic peer_state stores the Neighbor peer state entries # list new_peer_l stores the new snapshot of Neighbor peer ip address - # dic new_peer_state stores the new snapshot of Neighbor peer states + # dic new_peer_state stores the new snapshot of Neighbor peer states self.peer_l = [] self.peer_state = {} self.new_peer_l = [] @@ -45,6 +46,8 @@ def __init__(self): self.cached_timestamp = 0 self.db = swsssdk.SonicV2Connector() self.db.connect(self.db.STATE_DB, False) + client = self.db.get_redis_client(self.db.STATE_DB) + self.pipe = client.pipeline() self.db.delete_all_by_pattern(self.db.STATE_DB, "NEIGH_STATE_TABLE|*" ) # A quick way to check if there are anything happening within BGP is to @@ -60,37 +63,56 @@ def bgp_activity_detected(self): return True else: return False - except Exception, e: + except (IOError, OSError): return True + def update_new_peer_states(self, peer_dict): + peer_l = peer_dict["peers"].keys() + self.new_peer_l.extend(peer_l) + for i in range (0, len(peer_l)): + self.new_peer_state[peer_l[i]] = peer_dict["peers"][peer_l[i]]["state"] + # Get a new snapshot of BGP neighbors and store them in the "new" location def get_all_neigh_states(self): - ipv6_peer_l = [] - try: - cmd = "vtysh -c 'show bgp summary json'" - output = commands.getoutput(cmd) - peer_info = json.loads(output) - # no exception, safe to Clean the "new" lists/dic for new sanpshot - del self.new_peer_l[:] - self.new_peer_state.clear() - if "ipv4Unicast" in peer_info and "peers" in peer_info["ipv4Unicast"]: - self.new_peer_l = peer_info["ipv4Unicast"]["peers"].keys() - for i in range (0, len(self.new_peer_l)): - self.new_peer_state[self.new_peer_l[i]] = \ - peer_info["ipv4Unicast"]["peers"][self.new_peer_l[i]]["state"] - - if "ipv6Unicast" in peer_info and "peers" in peer_info["ipv6Unicast"]: - ipv6_peer_l = peer_info["ipv6Unicast"]["peers"].keys() - self.new_peer_l.extend(ipv6_peer_l) - for i in range (0, len(ipv6_peer_l)): - self.new_peer_state[ipv6_peer_l[i]] = \ - peer_info["ipv6Unicast"]["peers"][ipv6_peer_l[i]]["state"] - - except Exception: - syslog.syslog(syslog.LOG_ERR, "*ERROR* get_all_neigh_states Exception: %s" - % (traceback.format_exc())) + cmd = "vtysh -c 'show bgp summary json'" + rc, output = commands.getstatusoutput(cmd) + if rc: + syslog.syslog(syslog.LOG_ERR, "*ERROR* Failed with rc:{} when execute: {}".format(rc, cmd)) + return + + peer_info = json.loads(output) + # cmd ran successfully, safe to Clean the "new" lists/dic for new sanpshot + del self.new_peer_l[:] + self.new_peer_state.clear() + for key, value in peer_info.items(): + if key == "ipv4Unicast" or key == "ipv6Unicast": + self.update_new_peer_states(value) + + def flush_pipe(self, data): + """Dump each entry in data{} into State DB via redis pipeline. + Args: + data: Neighbor state in dictionary format + { + 'NEIGH_STATE_TABLE|ip_address_a': {'state':state}, + 'NEIGH_STATE_TABLE|ip_address_b': {'state':state}, + 'NEIGH_STATE_TABLE|ip_address_c': {'state':state}, + 'NEIGH_STATE_TABLE|ip_address_x': None, + 'NEIGH_STATE_TABLE|ip_address_z': None + ... + } + """ + for key, value in data.items(): + if value is None: + # delete case + self.pipe.delete(key) + else: + # Add or Modify case + self.pipe.hmset(key, value) + self.pipe.execute() def update_neigh_states(self): + data = {} + pipe_count = 0 for i in range (0, len(self.new_peer_l)): peer = self.new_peer_l[i] key = "NEIGH_STATE_TABLE|%s" % peer @@ -99,22 +121,36 @@ def update_neigh_states(self): if self.peer_state[peer] != self.new_peer_state[peer]: # state changed. Update state DB for this entry state = self.new_peer_state[peer] - self.db.set(self.db.STATE_DB, key, 'state', state) + data[key] = {'state':state} + pipe_count += 1 self.peer_state[peer] = state # remove this neighbor from old list since it is accounted for self.peer_l.remove(peer) else: # New neighbor found case. Add to dictionary and state DB state = self.new_peer_state[peer] - self.db.set(self.db.STATE_DB, key, 'state', state) + data[key] = {'state':state} + pipe_count += 1 self.peer_state[peer] = state + if pipe_count > PIPE_BATCH_MAX_COUNT: + self.flush_pipe(data) + data = {} + pipe_count = 0 # Check for stale state entries to be cleaned up while len(self.peer_l) > 0: # remove this from the stateDB and the current nighbor state entry peer = self.peer_l.pop(0) del_key = "NEIGH_STATE_TABLE|%s" % peer - self.db.delete(self.db.STATE_DB, del_key) + data[del_key] = None + pipe_count += 1 del self.peer_state[peer] + if pipe_count > PIPE_BATCH_MAX_COUNT: + self.flush_pipe(data) + data = {} + pipe_count = 0 + # If anything in the pipeline not yet flushed, flush them now + if pipe_count > 0: + self.flush_pipe(data) # Save the new List self.peer_l = self.new_peer_l[:] @@ -124,7 +160,7 @@ def main(): try: bgp_state_get = BgpStateGet() - except Exception, e: + except Exception as e: syslog.syslog(syslog.LOG_ERR, "{}: error exit 1, reason {}".format(THIS_MODULE, str(e))) exit(1)