-
Notifications
You must be signed in to change notification settings - Fork 1
/
potomiel-ssh.py
executable file
·128 lines (104 loc) · 3.49 KB
/
potomiel-ssh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
from binascii import hexlify
from os import system, path
from paramiko.py3compat import b, u, decodebytes
from rich import pretty, inspect, print as pp
import base64
import json
import os
import paramiko
import socket
import sys
import sys
import threading
import traceback
import uuid
pretty.install()
proto = "ssh"
port = 50022
def usage():
print(f"Usage: python {sys.argv[0]}")
exit(42)
if len(sys.argv) != 1:
usage()
host_key = paramiko.RSAKey(filename="rsa-srv.key")
print("Read key: " + u(hexlify(host_key.get_fingerprint())))
class Server(paramiko.ServerInterface):
# 'data' is the output of base64.b64encode(key)
# (using the "user_rsa_key" files)
# data = (
# b"AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp"
# b"fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC"
# b"KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT"
# b"UWT10hcuO4Ks8="
# )
# good_pub_key = paramiko.RSAKey(data=decodebytes(data))
def __init__(self):
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
if kind == "session":
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_password(self, username, password):
data = dict()
data["username"] = username
data["password"] = password
with open(f"dump/{proto}/{uuid.uuid4().hex}.json", "w") as f:
f.write(json.dumps(data, indent=2))
# if (username == "nope") and (password == "nope"):
# return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
print("Auth attempt with key: " + u(hexlify(key.get_fingerprint())))
data = dict()
data["username"] = username
data["key"] = u(hexlify(key.get_fingerprint()))
with open(f"dump/{proto}/{uuid.uuid4().hex}.json", "w") as f:
f.write(json.dumps(data, indent=2))
# if (username == "robey") and (key == self.good_pub_key):
# return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def get_allowed_auths(self, username):
return "password,publickey"
def check_channel_shell_request(self, channel):
self.event.set()
return False
def check_channel_pty_request(
self, channel, term, width, height, pixelwidth, pixelheight, modes
):
return False
# now connect
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", port))
except Exception as e:
print("*** Bind failed: " + str(e))
traceback.print_exc()
# sys.exit(1)
while True:
try:
sock.listen(100)
print("Listening for connection ...")
client, addr = sock.accept()
except Exception as e:
print("*** Listen/accept failed: " + str(e))
traceback.print_exc()
print("Got a connection!")
try:
t = paramiko.Transport(client)
t.load_server_moduli()
t.add_server_key(host_key)
server = Server()
t.start_server(server=server)
# wait for auth
chan = t.accept(20)
if chan is None:
print("*** No channel.")
except Exception as e:
print("*** Caught exception: " + str(e.__class__) + ": " + str(e))
traceback.print_exc()
try:
t.close()
except:
pass