forked from viczem/ansible-keepass
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kpsock.py
207 lines (179 loc) · 7.12 KB
/
kpsock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import socket
import argparse
import tempfile
import logging
from logging import config as logging_config
from getpass import getpass
from pykeepass import PyKeePass
from pykeepass.exceptions import CredentialsError
try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError
def main(kdbx, psw, kdbx_key, sock_fpath, ttl=60):
log = logging.getLogger('ansible_keepass')
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind(sock_fpath)
s.listen(1)
s.settimeout(ttl)
os.chmod(sock_fpath, 0o600)
log.info('Open ansible-keepass socket. TTL={}sec'.format(ttl))
with PyKeePass(kdbx, psw, kdbx_key) as kp:
log.info('%s decrypted' % kdbx)
while True:
log.debug('Wait a client connection')
conn, addr = s.accept()
log.debug('Client connected')
with conn:
conn.settimeout(ttl)
while True:
data = conn.recv(1024).decode()
if not data:
break
msg = json.loads(data)
if not isinstance(msg, dict):
raise ValueError('wrong message format')
if 'attr' not in msg or 'path' not in msg:
raise ValueError('wrong message properties')
path = msg['path'].strip('/')
attr = msg['attr']
log.debug("attr: %s in path: %s" % (attr, path))
entr = kp.find_entries_by_path(path, first=True)
if entr is None:
conn.send(
_msg('error',
'path %s is not found' % path))
log.error('path %s is not found' % path)
continue
if not hasattr(entr, attr):
conn.send(
_msg('error',
'attr %s is not found' % attr))
log.error('attr %s is not found' % attr)
continue
conn.send(_msg('ok', getattr(entr, attr)))
log.info('Fetch %s: %s', path, attr)
except CredentialsError:
log.error("%s failed to decrypt" % kdbx)
sys.exit(1)
except FileNotFoundError as e:
log.error(str(e))
sys.exit(1)
except json.JSONDecodeError as e:
log.error("JSONDecode: %s" % e)
sys.exit(1)
except ValueError as e:
log.error(str(e))
sys.exit(1)
except (KeyboardInterrupt, socket.timeout):
pass
finally:
log.info("Close ansible-keepass socket")
if os.path.exists(sock_fpath):
os.remove(sock_fpath)
def _msg(status, text):
return json.dumps({
'status': status,
'text': text
}).encode()
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(description=(
"Creating UNIX socket for response to requests "
"from the keepass lookup plugin. The database and password "
"are stay decrypted in memory while socket opened. "
"Format of a request in JSON with the properties: attr, path. "
"Response is JSON with properties: status, text."
), formatter_class=argparse.RawDescriptionHelpFormatter)
arg_parser.add_argument(
'kdbx', type=str, help="Path to .kdbx file")
arg_parser.add_argument(
'--key', type=str, nargs='?', default=None,
help="Path to a KeePass keyfile")
arg_parser.add_argument(
'--ttl', type=int, nargs='?', default=60, const=60,
help="Time-To-Live since past access in seconds. "
"Default is 1 minute")
arg_parser.add_argument(
'--log', type=str, nargs='?', default=None, const='',
help="Path to log file. If empty string the log file will be "
"created in the same directory as .kdbx file with suffix .log")
arg_parser.add_argument(
'--log-level', type=str, nargs='?', default='INFO', choices=(
'CRITICAL',
'ERROR',
'WARNING',
'INFO',
'DEBUG',
))
args = arg_parser.parse_args()
kdbx_fpath = os.path.realpath(os.path.expanduser(args.kdbx))
if not os.path.exists(kdbx_fpath):
sys.stderr.write("KeePass file %s does not exist" % kdbx_fpath)
sys.exit(1)
kdbx_key_fpath = None
if args.key:
kdbx_key_fpath = os.path.realpath(os.path.expanduser(args.key))
if not os.path.exists(kdbx_key_fpath):
sys.stderr.write("--key %s does not exist" % kdbx_key_fpath)
sys.exit(1)
# - predictable socket path for use in ansible plugin
# - tempdir for prevent error AF_UNIX path too long
# - only one socket can be opened
tempdir = tempfile.gettempdir()
if not os.access(tempdir, os.W_OK):
sys.stderr.write("You have no write permissions to %s" % tempdir)
sys.exit(1)
sock_file_path = "%s/ansible-keepass.sock" % tempdir
if os.path.exists(sock_file_path):
sys.stderr.write("kpsock is already opened. If you sure that kpsock "
"closed, run: rm %s" % sock_file_path)
sys.exit(1)
password = getpass("Password: ")
if isinstance(password, bytes):
password = password.decode(sys.stdin.encoding)
dict_config = {
'version': 1,
'formatters': {
'default': {
'format': "%(asctime)s [%(levelname)-5.5s] %(message)s ",
'datefmt': "%Y-%m-%d %H:%M:%S",
}
},
'handlers': {
'stdout': {
'class': 'logging.StreamHandler',
'formatter': 'default',
},
},
'loggers': {
'ansible_keepass': {
'propagate': False,
'level': args.log_level,
'handlers': [
'stdout'
]
}
}
}
if args.log is not None:
if args.log == '':
log_fpath = kdbx_fpath + ".log"
else:
log_fpath = os.path.realpath(os.path.expanduser(args.log))
if not os.access(os.path.dirname(log_fpath), os.W_OK | os.X_OK):
sys.stderr.write("--log %s permission error" % log_fpath)
sys.exit(1)
dict_config['handlers']['file'] = {
'class': 'logging.FileHandler',
'filename': log_fpath,
'formatter': 'default',
}
dict_config['loggers']['ansible_keepass']['handlers'].append('file')
logging_config.dictConfig(dict_config)
main(kdbx_fpath, password, kdbx_key_fpath, sock_file_path, args.ttl)