1
+ from db .models import User ,APIKey
2
+ from sqlalchemy .orm import session
3
+ import hashlib
4
+ from typing import Callable
5
+ from uuid import uuid4
6
+
7
+ def encryption (text :str ,method :Callable = hashlib .sha256 )-> str :
8
+ hashed_data = method (text .encode ()).hexdigest ()
9
+ return str (hashed_data )[:16 ]
10
+
11
+ # Create User
12
+ def add_user (session :session .Session ,user :User )-> bool :
13
+ try :
14
+ session .add (user )
15
+ # Create API Key
16
+ session .add (APIKey (
17
+ username = user .username ,
18
+ ))
19
+ session .commit ()
20
+ return True
21
+ except Exception as e :
22
+ session .rollback ()
23
+ return False
24
+ # User Read
25
+ def auth_user (session :session .Session ,user_id :str ,password :str )-> bool | str :
26
+ try :
27
+ # user_id 검사
28
+ user_id_hash = encryption (user_id )
29
+ q = session .query (User ).filter_by (userid = user_id_hash ).with_entities (User .password ,User .username ).one_or_none ()
30
+ if q is None :
31
+ raise Exception ("사용자 없음" )
32
+ # password 검사
33
+ password_hash = encryption (password )
34
+ if password_hash != q [0 ]:
35
+ raise Exception ("비밀번호 틀림" )
36
+ return q [1 ]
37
+
38
+ except Exception as e :
39
+ session .rollback ()
40
+ return False
41
+
42
+ # API Key get
43
+ def get_APIKey (session :session .Session ,username :str )-> str | bool :
44
+ try :
45
+ q = session .query (APIKey ).filter_by (username = username ).with_entities (APIKey .api_key ).one_or_none ()
46
+ if q is None :
47
+ raise Exception ("APIKey 없음" )
48
+ return q [0 ]
49
+ except Exception as e :
50
+ session .rollback ()
51
+ return False
52
+
53
+ # API Key auth
54
+ def auth_APIKey (session :session .Session ,api_key :str ,username :str )-> bool :
55
+ try :
56
+ q = session .query (APIKey ).filter_by (api_key = api_key ).with_entities (APIKey .username ).one_or_none ()
57
+ if q is None :
58
+ raise Exception ("올바르지 않은 APIKey" )
59
+ if q [0 ] != username :
60
+ raise Exception ("허락되지 않은 사용자" )
61
+ return True
62
+ except Exception as e :
63
+ session .rollback ()
64
+ return False
65
+
66
+ # Delete User and Key
67
+ def delete_user (session :session .Session ,username :str )-> bool :
68
+ try :
69
+ q = session .query (User ).filter_by (username = username ).one_or_none ()
70
+ if q is None :
71
+ raise Exception ("사용자 없음" )
72
+
73
+ api_key = session .query (APIKey ).filter_by (username = q .username ).one_or_none ()
74
+
75
+ if api_key is None :
76
+ raise Exception ("APIKey 없음" )
77
+
78
+ session .delete (q )
79
+ session .delete (api_key )
80
+ session .commit ()
81
+ return True
82
+
83
+
84
+ except Exception as e :
85
+ session .rollback ()
86
+ return False
87
+
88
+ def apiKey_update_all (session :session .Session )-> bool :
89
+ try :
90
+ # 모든 APIKey를 업데이트
91
+ q = session .query (APIKey ).all ()
92
+ if q == []:
93
+ raise Exception ("APIKey 없음" )
94
+ for api_key in q :
95
+ api_key .api_key = str (uuid4 ())
96
+
97
+ session .commit ()
98
+ return True
99
+
100
+ except Exception as e :
101
+ session .rollback ()
102
+ return False
103
+
104
+ if __name__ == "__main__" :
105
+ pass
106
+ from db .console import getSession
107
+
108
+ cur = getSession ()
109
+ # add_user(cur,User(
110
+ # username="admin1",
111
+ # userid=encryption("admin1",hashlib.sha256),
112
+ # password=encryption("qwer1234!@",hashlib.sha256)
113
+ # ))
114
+ print (auth_user (cur ,"admin1" ,"qwer1234!@" ))
115
+ # auth_APIKey(cur,"34e53889-f1f7-4415-ab87-6fac408a00ae","admin1")
116
+ # delete_user(cur,"admin1")
117
+ # apiKey_update_all(cur)
0 commit comments