-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth_server.py
148 lines (115 loc) · 4.76 KB
/
auth_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import sys
from concurrent import futures
import secrets
import grpc
import auth_pb2
import auth_pb2_grpc
import threading
TOKEN_SIZE = 32
class User:
"""
Class that represents a user in the system.
"""
def __init__(self, secret, password, permissions):
self.secret = secret
self.password = password
self.permissions = permissions
def __str__(self) -> str:
return f"User(password={self.password}, permissions={self.permissions}, secret={int.from_bytes(self.secret, byteorder='big')})"
def __repr__(self) -> str:
return self.__str__()
class AuthServer(auth_pb2_grpc.AuthServicer):
def __init__(self, stop_event, port, admin_password):
self.stop_event = stop_event # Event to stop the thread
self.port = port
self.initialize(admin_password)
print("AuthServer initialized")
def initialize(self, admin_password):
""" Initialize the server with the super user registered. """
admin_secret = secrets.token_bytes(TOKEN_SIZE)
# Dictionary of users
self.users = {"super": User(admin_secret, admin_password, "SP")}
# Dictionary of secrets and permissions
self.authentications = {admin_secret: "SP"}
def get_user_password(self, username):
return self.users[username].password
def get_user_permissions(self, username):
return self.users[username].permissions
def get_user_secret(self, username):
if username not in self.users:
return None
return self.users[username].secret
def set_user_secret(self, username, secret):
if username not in self.users:
return -1
self.users[username].secret = secret
return 0
def set_user_password(self, username, password):
if username not in self.users:
return -1
self.users[username].password = password
return 0
def set_user_permissions(self, username, permissions):
if username not in self.users:
return -1
self.users[username].permissions = permissions
return 0
def Authenticate(self, request, context):
username = request.identifier
password = request.password
# Check if user exists and password is correct
if username not in self.users or self.get_user_password(username) != password:
return auth_pb2.AuthenticateReply(status=-1, secret=b"\0" * TOKEN_SIZE)
# Generate new secret and update user's previous secret
new_secret = secrets.token_bytes(TOKEN_SIZE)
# Update authentications
old_secret = self.get_user_secret(username)
if old_secret in self.authentications:
permissions = self.authentications.pop(old_secret)
else:
permissions = self.get_user_permissions(username)
self.authentications[new_secret] = permissions
# Update user's secret
self.set_user_secret(username, new_secret)
return auth_pb2.AuthenticateReply(status=0, secret=new_secret)
def CreateUser(self, request, context):
username = request.identifier
password = request.password
permissions = request.permissions
secret = request.secret
# Check if secret is different from the super user's secret
if secret != self.users['super'].secret:
return auth_pb2.CreateUserReply(status=-1)
# Check if user already exists
if username in self.users:
return auth_pb2.CreateUserReply(status=-2)
# Create the new user
self.users[username] = User(secrets.token_bytes(
TOKEN_SIZE), password, permissions)
return auth_pb2.CreateUserReply(status=0)
def VerifyAccess(self, request, context):
secret = request.secret
# Check if secret is authenticated
if secret not in self.authentications:
return auth_pb2.VerifyAccessReply(permissions="NE")
return auth_pb2.VerifyAccessReply(permissions=self.authentications.get(secret))
def FinishExecution(self, request, context):
# Terminate the server
self.stop_event.set()
return auth_pb2.FinishExecutionReply(users=len(self.users))
if __name__ == "__main__":
# Input arguments
port = int(sys.argv[1])
admin_password = int(sys.argv[2])
stop_event = threading.Event()
# Create server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
auth_pb2_grpc.add_AuthServicer_to_server(
AuthServer(stop_event, port, admin_password), server)
server.add_insecure_port(f"[::]:{port}")
# Start server
server.start()
print(f"AuthServer started, listening on port {port}")
# Wait for stop event
stop_event.wait()
server.stop()