-
Notifications
You must be signed in to change notification settings - Fork 0
/
check_by_ssh_conntrack.py
150 lines (115 loc) · 4.59 KB
/
check_by_ssh_conntrack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/bin/env/python3
# -*- coding: utf-8 -*-
# A check that will check the ldap server and print prometheus-metrics
# if it is reachable
# This is useful for monitoring the ldap connection
# It will export to a .prom file that can be read by prometheus-node-exporter
# if needed you can also check the ldap connection via ssh
# Verheiraten von Hosts mit Results
# 1. Hosts in Liste
# 2. Results in Liste
# 3. Hosts und Results in Dict
# 4. Sicherstellen das Host zum Result passt.
import argparse
import prometheus_client as prom
import asyncssh as ssh
import asyncio
import pickle as rick
import base64
# initialize variables
result_dict = {}
result_dict2 = {}
result_List = []
# initialize custom Argument
def list_of_strings(arg):
return arg.split(',')
# parse arguments
parser = argparse.ArgumentParser(description="Checks ldap connection")
parser.add_argument("--sshuser",
help="The ssh user",
default="root")
parser.add_argument("--sshpassword",
help="The ssh password",
default=None)
parser.add_argument("--sshhosts",
help="The ssh host",
type=list_of_strings)
parser.add_argument("--sshport",
help="The ssh port",
default=22)
parser.add_argument("--key",
help="The ssh key")
parser.add_argument("--secret",
help="The secret file")
args = parser.parse_args()
# read secrets
if args.secret:
with (open(args.secret, "rb")) as secret_input:
secret = rick.load(secret_input)
secret = eval(base64.b64decode(secret))
secret = dict(secret)
args.sshuser = secret["sshuser"]
args.sshpassword = secret["sshpassword"]
args.sshhosts = secret["sshhosts"]
args.sshport = secret["sshport"]
args.key = secret["key"]
# initialize prometheus metrics
registry = prom.CollectorRegistry()
nf_conntrack_max = prom.Gauge('max_connections',
'Checks the max connections possible',
['host'])
current_connections = prom.Gauge('current_connections',
'Checks if the current conntack connections',
['host'])
# connect to ldap server
async def run_client(host,
command: str) -> None:
if args.key:
async with ssh.connect(host=host,
username=args.sshuser,
client_keys=args.key,
port=args.sshport,
known_hosts=None) as conn:
return await conn.run(command)
if args.sshpassword is not None:
async with ssh.connect(host=host,
username=args.sshuser,
password=args.sshpassword,
port=args.sshport,
known_hosts=None) as conn:
return await conn.run(command)
async def ssh_conntrack_check() -> None:
for host in args.sshhosts:
command = "cat /proc/sys/net/netfilter/nf_conntrack_max"
task = (run_client(host,
command))
ssh_results = await asyncio.gather(task,
return_exceptions=True)
for ssh_conntrack in enumerate(ssh_results):
result = list(ssh_conntrack)
if result[1].stderr == "" and result[1].stdout != "":
result_dict[host] = result[1].stdout
else:
result_dict[host] = 0
async def ssh_currentcon_check() -> None:
for host in args.sshhosts:
command = "cat /proc/sys/net/netfilter/nf_conntrack_count"
task = (run_client(host,
command))
ssh_results = await asyncio.gather(task,
return_exceptions=True)
for ssh_conntrack in enumerate(ssh_results):
result = list(ssh_conntrack)
if result[1].stderr == "" and result[1].stdout != "":
result_dict2[host] = result[1].stdout
else:
result_dict2[host] = 0
registry.register(nf_conntrack_max)
registry.register(current_connections)
asyncio.new_event_loop().run_until_complete(ssh_conntrack_check())
asyncio.new_event_loop().run_until_complete(ssh_currentcon_check())
for host, state in result_dict.items():
nf_conntrack_max.labels(host).set(state)
for host, state in result_dict2.items():
current_connections.labels(host).set(state)
print(prom.generate_latest(registry).decode("utf-8"))