-
Notifications
You must be signed in to change notification settings - Fork 16
/
auth.py
267 lines (201 loc) · 8.26 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
from __future__ import annotations
import io
from copy import copy
from hashlib import sha1
import logging
from dataclasses import dataclass
from typing import Optional, Dict, AsyncGenerator, Union, Tuple, Sequence
from mysql_mimic.types import read_str_null
from mysql_mimic import utils
logger = logging.getLogger(__name__)
# Many authentication plugins don't need to send any sort of challenge/nonce.
FILLER = b"0" * 20 + b"\x00"
@dataclass
class Forbidden:
msg: Optional[str] = None
@dataclass
class Success:
authenticated_as: str
@dataclass
class User:
name: str
auth_string: Optional[str] = None
auth_plugin: Optional[str] = None
# For plugins that support a primary and secondary password
# This is helpful for zero-downtime password rotation
old_auth_string: Optional[str] = None
@dataclass
class AuthInfo:
username: str
data: bytes
user: User
connect_attrs: Dict[str, str]
client_plugin_name: Optional[str]
handshake_auth_data: Optional[bytes]
handshake_plugin_name: str
def copy(self, data: bytes) -> AuthInfo:
new = copy(self)
new.data = data
return new
Decision = Union[Success, Forbidden, bytes]
AuthState = AsyncGenerator[Decision, AuthInfo]
class AuthPlugin:
"""
Abstract base class for authentication plugins.
"""
name = ""
client_plugin_name: Optional[str] = None # None means any
async def auth(self, auth_info: Optional[AuthInfo] = None) -> AuthState:
"""
Create an async generator that drives the authentication lifecycle.
This should either yield `bytes`, in which case an AuthMoreData packet is sent to the client,
or a `Success` or `Forbidden` instance, in which case authentication is complete.
Args:
auth_info: This is None if authentication is starting from the optimistic handshake.
"""
yield Forbidden()
async def start(
self, auth_info: Optional[AuthInfo] = None
) -> Tuple[Decision, AuthState]:
state = self.auth(auth_info)
data = await state.__anext__()
return data, state
class AbstractClearPasswordAuthPlugin(AuthPlugin):
"""
Abstract class for implementing the server-side of the standard client plugin "mysql_clear_password".
"""
name = "abstract_mysql_clear_password"
client_plugin_name = "mysql_clear_password"
async def auth(self, auth_info: Optional[AuthInfo] = None) -> AuthState:
if not auth_info:
auth_info = yield FILLER
r = io.BytesIO(auth_info.data)
password = read_str_null(r).decode()
authenticated_as = await self.check(auth_info.username, password)
if authenticated_as is not None:
yield Success(authenticated_as)
else:
yield Forbidden()
async def check(self, username: str, password: str) -> Optional[str]:
return username
class NativePasswordAuthPlugin(AuthPlugin):
"""
Standard plugin that uses a password hashing method.
The client hashed the password using a nonce provided by the server, so the
password can't be snooped on the network.
Furthermore, thanks to some clever hashing techniques, knowing the hash stored in the
user database isn't enough to authenticate as that user.
"""
name = "mysql_native_password"
client_plugin_name = "mysql_native_password"
async def auth(self, auth_info: Optional[AuthInfo] = None) -> AuthState:
if (
auth_info
and auth_info.handshake_plugin_name == self.name
and auth_info.handshake_auth_data
):
# mysql_native_password can reuse the nonce from the initial handshake
nonce = auth_info.handshake_auth_data.rstrip(b"\x00")
else:
nonce = utils.nonce(20)
# Some clients expect a null terminating byte
auth_info = yield nonce + b"\x00"
user = auth_info.user
if self.password_matches(user=user, scramble=auth_info.data, nonce=nonce):
yield Success(user.name)
else:
yield Forbidden()
def empty_password_quickpath(self, user: User, scramble: bytes) -> bool:
return not scramble and not user.auth_string
def password_matches(self, user: User, scramble: bytes, nonce: bytes) -> bool:
return (
self.empty_password_quickpath(user, scramble)
or self.verify_scramble(user.auth_string, scramble, nonce)
or self.verify_scramble(user.old_auth_string, scramble, nonce)
)
def verify_scramble(
self, auth_string: Optional[str], scramble: bytes, nonce: bytes
) -> bool:
# From docs,
# response.data should be:
# SHA1(password) XOR SHA1("20-bytes random data from server" <concat> SHA1(SHA1(password)))
# auth_string should be:
# SHA1(SHA1(password))
try:
sha1_sha1_password = bytes.fromhex(auth_string or "")
sha1_sha1_with_nonce = sha1(nonce + sha1_sha1_password).digest()
rcvd_sha1_password = utils.xor(scramble, sha1_sha1_with_nonce)
return sha1(rcvd_sha1_password).digest() == sha1_sha1_password
except Exception: # pylint: disable=broad-except
logger.info("Invalid scramble")
return False
@classmethod
def create_auth_string(cls, password: str) -> str:
return sha1(sha1(password.encode("utf-8")).digest()).hexdigest()
class KerberosAuthPlugin(AuthPlugin):
"""
This plugin implements the Generic Security Service Application Program Interface (GSS-API) by way of the Kerberos
mechanism as described in RFC1964(https://www.rfc-editor.org/rfc/rfc1964.html).
"""
name = "authentication_kerberos"
client_plugin_name = "authentication_kerberos_client"
def __init__(self, service: str, realm: str) -> None:
self.service = service
self.realm = realm
async def auth(self, auth_info: Optional[AuthInfo] = None) -> AuthState:
import gssapi
from gssapi.exceptions import GSSError
# Fast authentication not supported
if not auth_info:
yield b""
auth_info = (
yield len(self.service).to_bytes(2, "little")
+ self.service.encode("utf-8")
+ len(self.realm).to_bytes(2, "little")
+ self.realm.encode("utf-8")
)
server_creds = gssapi.Credentials(
usage="accept", name=gssapi.Name(f"{self.service}@{self.realm}")
)
server_ctx = gssapi.SecurityContext(usage="accept", creds=server_creds)
try:
server_ctx.step(auth_info.data)
except GSSError as e:
yield Forbidden(str(e))
username = str(server_ctx.initiator_name).split("@", 1)[0]
if auth_info.username and auth_info.username != username:
yield Forbidden("Given username different than kerberos client")
yield Success(username)
class NoLoginAuthPlugin(AuthPlugin):
"""
Standard plugin that prevents all clients from direct login.
This is useful for user accounts that can only be accessed by proxy authentication.
"""
name = "mysql_no_login"
async def auth(self, auth_info: Optional[AuthInfo] = None) -> AuthState:
if not auth_info:
_ = yield FILLER
yield Forbidden()
class IdentityProvider:
"""
Abstract base class for an identity provider.
An identity provider tells the server with authentication plugins to make
available to clients and how to retrieve users.
"""
def get_plugins(self) -> Sequence[AuthPlugin]:
return [NativePasswordAuthPlugin(), NoLoginAuthPlugin()]
async def get_user(self, username: str) -> Optional[User]:
return None
def get_default_plugin(self) -> AuthPlugin:
return self.get_plugins()[0]
def get_plugin(self, name: str) -> Optional[AuthPlugin]:
try:
return next(p for p in self.get_plugins() if p.name == name)
except StopIteration:
return None
class SimpleIdentityProvider(IdentityProvider):
"""
Simple identity provider implementation that naively accepts whatever username a client provides.
"""
async def get_user(self, username: str) -> Optional[User]:
return User(name=username, auth_plugin=NativePasswordAuthPlugin.name)