-
Notifications
You must be signed in to change notification settings - Fork 0
/
ldap_check.py
191 lines (146 loc) · 5.41 KB
/
ldap_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/bin/env/python3
# -*- coding: utf-8 -*-
# A check that will check the ldap server and print prometheus-metrics
# if it is reachable
# This is useful for monitoring the ldap connection
# It will export to a .prom file that can be read by prometheus-node-exporter
# if needed you can also check the ldap connection via ssh
# Verheiraten von Hosts mit Results
# 1. Hosts in Liste
# 2. Results in Liste
# 3. Hosts und Results in Dict
# 4. Sicherstellen das Host zum Result passt.
import argparse
import ldap3
import prometheus_client as prom
import asyncssh as ssh
import asyncio
import pickle as rick
import base64
# initialize variables
result_dict = {}
result_List = []
# initialize custom Argument
def list_of_strings(arg):
return arg.split(',')
# parse arguments
parser = argparse.ArgumentParser(description="Checks ldap connection")
parser.add_argument("-S",
"--server",
help="The ldap server",
default="localhost")
parser.add_argument("-p",
"--port",
help="The ldap port",
default=389)
parser.add_argument("-b",
"--base",
help="The ldap base",
)
parser.add_argument("-u",
"--user",
help="The ldap user",
)
parser.add_argument("-P",
"--password",
help="The ldap password",
)
parser.add_argument("--ssh",
help="Connect via ssh and do the tests",
default=False,
action="store_true")
parser.add_argument("--sshuser",
help="The ssh user",
default="root")
parser.add_argument("--sshpassword",
help="The ssh password",
default=None)
parser.add_argument("--sshhosts",
help="The ssh host",
type=list_of_strings)
parser.add_argument("--sshport",
help="The ssh port",
default=22)
parser.add_argument("--key",
help="The ssh key")
parser.add_argument("-s", "--secret",
help="The secrets-file",
type=str)
args = parser.parse_args()
# read secrets
if args.secret:
with (open(args.secret, "rb")) as secret_input:
secret = rick.load(secret_input)
secret = eval(base64.b64decode(secret))
secret = dict(secret)
args.server = secret["server"]
args.port = secret["port"]
args.base = secret["base"]
args.user = secret["user"]
args.password = secret["password"]
args.sshuser = secret["sshuser"]
args.sshpassword = secret["sshpassword"]
args.sshhosts = secret["sshhosts"]
args.sshport = secret["sshport"]
args.key = secret["key"]
# initialize prometheus metrics
registry = prom.CollectorRegistry()
ldap_check = prom.Gauge('ldap_check',
'Checks if ldap is reachable',
['server'])
ldap_check2 = prom.Gauge('ldap_check2',
'Checks if ldap is reachable via ssh',
['server', 'host'])
# connect to ldap server
async def run_client(host,
command: str) -> None:
if args.key:
async with ssh.connect(host=host,
username=args.sshuser,
client_keys=args.key,
port=args.sshport,
known_hosts=None) as conn:
return await conn.run(command)
if args.sshpassword is not None:
async with ssh.connect(host=host,
username=args.sshuser,
password=args.sshpassword,
port=args.sshport,
known_hosts=None) as conn:
return await conn.run(command)
async def ssh_ldap_check() -> None:
for host in args.sshhosts:
command = "spauthcli lsgroupsremote | grep white"
task = (run_client(host,
command))
ldap_results = await asyncio.gather(task,
return_exceptions=True)
for result_ldap in enumerate(ldap_results):
result = list(result_ldap)
if result[1].stderr == "" and result[1].stdout != "":
result_dict[host] = 1
elif result[1].returncode != 0:
result_dict[host] = 0
else:
result_dict[host] = 0
if args.ssh is False:
try:
registry.register(ldap_check)
server = ldap3.Server(args.server,
port=args.port,
get_info=ldap3.ALL)
conn = ldap3.Connection(server,
args.user,
args.password,
auto_bind=True)
conn.search(args.base,
'(memberOf=*)')
ldap_check.labels(args.server).set(1)
except Exception:
ldap_check.labels(args.server).set(0)
if args.ssh is True:
registry.register(ldap_check2)
asyncio.new_event_loop().run_until_complete(ssh_ldap_check())
for host, state in result_dict.items():
ldap_check2.labels(args.server, host).set(state)
print(prom.generate_latest(registry).decode("utf-8"))