Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple concurrent gRPC subscriptions #23

Open
rich9474 opened this issue May 8, 2024 · 1 comment
Open

Multiple concurrent gRPC subscriptions #23

rich9474 opened this issue May 8, 2024 · 1 comment

Comments

@rich9474
Copy link

rich9474 commented May 8, 2024

I am struggling to work out how to subscribe to multiple streams of information. In my case that I am testing with at the moment, I want to subscribe to multiple bits of BGP information across multiple devices and react to changes.

Devices:
path_elements = ["DatasetInfo", "Devices"]

Then for each device it's BGP VRFS:
path_elements = ["Devices", <DeviceID>, "versioned-data", "routing", "bgp", "status", "vrf"]

Then for each device VRF, it's BGP Peers:
path_elements = ["Devices", <DeviceID>, "versioned-data", "routing", "bgp", "status", "vrf", <VRF>, "bgpPeerInfoStatusEntry"]

My script then reacts to event changes in the BGP peer states, and also adds or removed devices VRFs and peers as the updates come in. It almost works beautifully, but...

Because the grpc_client.subscribe(query) never returns, I initially ran it in a thread. This works for a few devices, VRFs and Peers, but is not very scalable. Once I got to 129 threads, I could not run any more. I tried using asyncio, but it appears while grpcio is supposed to support asyncio the cloudvision module does not. The examples as far as I can see only have very simplistic subscribe models. For example, the get_if_rate.py only gets a single metric from a single interface.

    with GRPCClient(apiserverAddr, token=token, certs=cert, key=key,
                    ca=ca) as client:
        for batch in client.subscribe(query):
            for notif in batch["notifications"]:
                pretty_print(notif["updates"])

I can't see a good way to deal with subscriptions in a scalable way using the cloudvision module. can someone please clarify how this should be done. It would be good if this can also be reflected in the Docs or examples. If it is already in there, and I am blind or stupid, can someone please point me in the right direction.

@cianmcgrath
Copy link
Collaborator

The CloudVision Connector supports the WIldcard() operator (some examples here and here)
These can be used in subscriptions and would pass on information changes to any matching paths to the client.
You would then extract that info from the notification's path elements to discern what element changed and can store/drop the element as desired based on that.

So your path element to generically get all of the bgp peers would be
path_elements = ["Devices", Wildcard(), "versioned-data", "routing", "bgp", "status", "vrf", Wildcard(), "bgpPeerInfoStatusEntry"]

Some filtering is shown below based off your above snippet

from cloudvision.Connector.codec import Wildcard
...
# dictionary of desired device/vrf combinations
devFilter = {
    "deviceA": {
       "vrf1", "vrf2"
    },
    #All vrfs for deviceB
    "deviceB":{}
}
path_elements = ["Devices", Wildcard(), "versioned-data", "routing", "bgp", "status", "vrf", Wildcard(),  "bgpPeerInfoStatusEntry"]
... #creation of query
with GRPCClient(apiserverAddr, token=token, certs=cert, key=key, ca=ca) as client:
        for batch in client.subscribe(query):
            for notif in batch["notifications"]:
                path_elts = notif["path_elements"]
                device = path_elts[1]
                vrfFilter = devFilter.get(device)
                if vrfFilter is None:
                    # device not in filter, skipping
                    continue
                # check to see if filtering occurring on vrfs
                if len(vrfFilter) != 0:
                    vrf = path_elts[7]
                    if vrf not in vrfFilter:
                        continue
                pretty_print(notif["updates"])

Does this help with your use case?

For some better performance (if you're interested in only a subset) there would likely need to be a resource API made to fetch those entries which would allow for filters to be set in the request and allow for server-side resolution of the various keys. That would decrease the amount of notifs your client would need to handle, but I'm not aware of there being such a rAPI existing or planned as of now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants