Skip to content

Commit

Permalink
Merge pull request #34 from weka/log_data_enrichment
Browse files Browse the repository at this point in the history
Enrichen log label data
  • Loading branch information
vince-weka authored Apr 19, 2022
2 parents 1e9234b + fa41ddd commit edf12d1
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
37 changes: 22 additions & 15 deletions collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from logging import getLogger
from threading import Lock

import wekalib
from prometheus_client.core import GaugeMetricFamily, InfoMetricFamily, GaugeHistogramMetricFamily
# local imports
import wekalib
from wekalib.wekatime import wekatime_to_datetime
from wekalib.circular import circular_list

Expand Down Expand Up @@ -112,6 +112,7 @@ def __init__(self, config, cluster_obj): # wekaCollector
self.max_procs = config['exporter']['max_procs']
self.max_threads_per_proc = config['exporter']['max_threads_per_proc']
self.backends_only = config['exporter']['backends_only']
self.map_registry = config["map_registry"]

self.cluster = cluster_obj

Expand Down Expand Up @@ -362,19 +363,25 @@ def gather(self):
# do in a try/except block because it can fail if the cluster changes while we're collecting data

# clear old maps, if any - if nodes come/go this can get funky with old data, so re-create it every time
weka_maps = {"node-host": {}, "node-role": {}, "host-role": {}} # initial state of maps
#weka_maps = {"node-host": {}, "node-role": {}, "host-role": {}} # initial state of maps
node_host_map = dict()
node_role_map = dict()
host_role_map = dict()

# populate maps
try:
for node in wekadata["nodeList"]:
weka_maps["node-host"][node["node_id"]] = node["hostname"]
weka_maps["node-role"][node["node_id"]] = node["roles"] # note - this is a list
#weka_maps["host-nids"] =
node_host_map[node["node_id"]] = node["hostname"]
node_role_map[node["node_id"]] = node["roles"]
for host in wekadata["hostList"]:
if host["mode"] == "backend":
weka_maps["host-role"][host["hostname"]] = "server"
host_role_map[host["hostname"]] = "server"
else:
weka_maps["host-role"][host["hostname"]] = "client"
host_role_map[host["hostname"]] = "client"
# update the maps so they can be used in the loki module
self.map_registry.register('node-host', node_host_map)
self.map_registry.register('node-role', node_role_map)
self.map_registry.register('node-role', host_role_map)
except Exception as exc:
log.error("error building maps. Aborting data gather from cluster {}".format(str(cluster)))
return
Expand All @@ -393,10 +400,10 @@ def gather(self):

# log.debug(f'{weka_maps["node-role"]}')

for node in weka_maps["node-role"]: # node == "NodeId<xx>"
for role in weka_maps['node-role'][node]:
for node in node_role_map: # node == "NodeId<xx>"
for role in node_role_map[node]:
nid = int(node.split('<')[1].split('>')[0]) # make nodeid numeric
hostname = weka_maps["node-host"][node]
hostname = node_host_map[node]
if hostname not in node_maps[role]:
node_maps[role][hostname] = list()
node_maps[role][hostname].append(nid) # needs to be dict of host:[nid]
Expand All @@ -419,9 +426,9 @@ def gather(self):
if host['status'] == 'UP' and host['state'] == 'ACTIVE':
up_list.append(host['hostname'])

log.debug(f"weka_maps['node-host']={weka_maps['node-host']}")
log.debug(f"node_host_map ={node_host_map}")
one_call_nids = dict()
for node, hostname in weka_maps['node-host'].items():
for node, hostname in node_host_map.items():
nid = int(node.split('<')[1].split('>')[0]) # make nodeid numeric
if hostname not in one_call_nids:
one_call_nids[hostname] = [nid]
Expand Down Expand Up @@ -680,9 +687,9 @@ def gather(self):
for statistic in stats_data:
node = statistic['node']
try:
hostname = weka_maps["node-host"][node]
host_role = weka_maps["host-role"][hostname]
role_list = weka_maps["node-role"][node]
hostname = node_host_map[node]
host_role = host_role_map[hostname]
role_list = node_role_map[node]
if len(role_list) > 1:
role = "multiple" # punt for now? Vince - Might want to list CPU_UTIL multiple times, once per role??
else:
Expand Down
11 changes: 8 additions & 3 deletions export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@

import prometheus_client

# local imports
#from maps import Map, MapRegistry
from maps import MapRegistry
import wekalib.signals as signals
from collector import WekaCollector
from lokilogs import LokiServer
# local imports
from wekalib.wekacluster import WekaCluster
import wekalib.exceptions

VERSION = "1.5.5"
VERSION = "1.5.6"
#VERSION = "experimental"

# set the root log
Expand Down Expand Up @@ -110,12 +112,15 @@ def prom_client(config):
log.critical(traceback.format_exc())
return

maps = MapRegistry()
config["map_registry"] = maps

# create the WekaCollector object
collector = WekaCollector(config, cluster_obj)

if config['exporter']['loki_host'] is not None:
try:
lokiserver = LokiServer(config['exporter']['loki_host'], config['exporter']['loki_port'])
lokiserver = LokiServer(config['exporter']['loki_host'], config['exporter']['loki_port'], maps)
except:
sys.exit(1)
else:
Expand Down
25 changes: 23 additions & 2 deletions lokilogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
log = getLogger(__name__)

class LokiServer(object):
def __init__(self, lokihost, lokiport):
def __init__(self, lokihost, lokiport, map_registry):
self.host = lokihost
self.port = lokiport
self.registry = map_registry
# save some trouble, and make sure names are resolvable
try:
socket.gethostbyname(lokihost)
Expand Down Expand Up @@ -94,13 +95,14 @@ def loki_logevent(self, timestamp, event, **labels):
def send_events(self, event_dict, cluster):

num_successful = 0
node_host_map = self.registry.lookup('node-host')

if len(event_dict) == 0:
log.debug("No events to send")
return

# must be sorted by timestamp or Loki will reject them
last_eventtime = "0"
#last_eventtime = "0"
for timestamp, event in sorted(event_dict.items()): # oldest first
labels = {
"source": "weka",
Expand All @@ -110,6 +112,25 @@ def send_events(self, event_dict, cluster):
"severity": event["severity"]
}
# "node_id": event["nid"],
if 'params' in event:
params = event['params']
log.debug(f'{event["description"]}:::::{params}')
if 'hostname' in params:
labels['hostname'] = params['hostname']
if 'nodeId' in params:
labels['nodeid'] = str(params['nodeId'])
if 'hostname' not in labels:
if type(params['nodeId']) is not str:
formatted_nodeid = 'NodeId<' + f'{params["nodeId"]}>'
else:
formatted_nodeid = params['nodeId']
try:
hostname = node_host_map[formatted_nodeid]
except Exception as exc:
log.error(f"NodeId {formatted_nodeid} not in node-host map!")
hostname = 'error'
log.debug("setting hostname*************************************************************")
labels['hostname'] = hostname

# map weka event severities to Loki event severities
orig_sev = event['severity']
Expand Down
18 changes: 18 additions & 0 deletions maps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# maps - objects that map things (node ids to hostnames, etc)
#
from threading import Lock


class MapRegistry(object):
def __init__(self):
self._lock = Lock()
self.map_registry = dict()

def register(self, map_name, map_object):
with self._lock:
self.map_registry[map_name] = map_object

def lookup(self, map_name):
with self._lock:
return self.map_registry[map_name]

0 comments on commit edf12d1

Please sign in to comment.