-
Notifications
You must be signed in to change notification settings - Fork 31
/
user.py
322 lines (262 loc) · 10.1 KB
/
user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import logging
import sys
from django.contrib import auth
from django.contrib.auth import get_user_model
from django.contrib.auth.forms import UserCreationForm
from django.contrib.auth.models import Group, Permission
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.http import HttpRequest
log = logging.getLogger(__name__)
def create_user(
username,
password=None,
email="",
is_staff=False,
is_superuser=False,
encrypted_password=None,
groups=None,
update_existing=False,
):
"""
Create a user and return the instance.
:param password: Plain Text password
:param encrypted_password: Hashed password (e.g. cloned vom other user)
:param groups: List of user group names
:return: created user instance
"""
if password is None and encrypted_password is None:
raise RuntimeError("'password' or 'encrypted_password' needed.")
User = get_user_model()
user = None
if update_existing:
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
user = None
else:
created = False
if user is None:
# Create user with origin form to validate username:
user_form_data = {"username": username, "password1": password, "password2": password}
if password is None:
# encrypted_password set later!
# Set something, to run validation.
user_form_data["password1"] = user_form_data["password2"] = "A temp password!"
user_form = UserCreationForm(user_form_data)
if user_form.is_valid():
user = user_form.save(commit=False)
created = True
else:
raise ValidationError(f"{user_form.errors}")
user.email = email
user.is_staff = is_staff
user.is_superuser = is_superuser
if encrypted_password:
log.critical("Set encrypted user password, ok.")
user.password = encrypted_password
elif password:
user.set_password(password)
else:
raise RuntimeError # should never happen, see above ;)
user.save()
if groups:
for group_name in groups:
try:
user_group = Group.objects.get(name=group_name)
except Group.DoesNotExist:
raise RuntimeError(f'User group "{group_name}" not found!')
user.groups.add(user_group)
if created:
log.debug(f"Create new user {user!r} account")
else:
log.debug(f"Update existing {user!r} account")
return user
def get_super_user():
"""
:return: the first 'superuser'
"""
User = get_user_model()
try:
super_user = User.objects.filter(is_superuser=True)[0]
except (User.DoesNotExist, IndexError):
log.error("Can't get a superuser. Please create first!")
raise RuntimeError("No superuser")
return super_user
def get_or_create_user(username, group, encrypted_password):
User = get_user_model()
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
user = create_user(
username=username,
groups=(group,),
is_staff=True,
is_superuser=False,
encrypted_password=encrypted_password,
)
created = True
else:
created = False
user.groups.set([group])
user.is_staff = True
user.is_superuser = False
user.save()
return user, created
def get_or_create_group(groupname, permissions):
group, created = Group.objects.get_or_create(name=groupname)
for permission in group.permissions.all():
if permission not in permissions:
print("remove permission:", permission)
group.permissions.remove(permission)
print(f"Add {len(permissions):d} permissions to {groupname!r}")
for permission in permissions:
group.permissions.add(permission)
existing_permission_count = group.permissions.all().count()
print(f"Group {groupname} has {existing_permission_count:d} permissions")
assert len(permissions) == existing_permission_count, (
f"Wrong permission count: {existing_permission_count:d} != {len(permissions):d}"
)
return group, created
def get_or_create_user_and_group(username, groupname, permissions, encrypted_password):
"""
* Creates a user group if not exists
* add given permissions to user group
* create user if not exists
* adds user into user group
e.g.:
from django_tools.permissions import get_filtered_permissions
from django_tools.unittest_utils.user import get_or_create_user_and_group
superuser = User.objects.filter(is_superuser=True, is_active=True)[0]
encrypted_password = superuser.password
test_user = get_or_create_user_and_group(
username="testuser",
groupname="testgroup",
permissions=get_filtered_permissions(
exclude_app_labels=("auth", "sites"),
exclude_models=(),
exclude_permissions=(),
),
encrypted_password=encrypted_password
)
"""
log.info("Create user group %s and add %i permissions...", groupname, len(permissions))
group, created = get_or_create_group(groupname, permissions)
if created:
log.debug("User group '%s' created.", groupname)
else:
log.debug("Use existing user group '%s', ok.", groupname)
log.info("Create user '%s'...", username)
user, created = get_or_create_user(username, group, encrypted_password)
if created:
log.debug("User '%s' created.", username)
else:
log.debug("User '%s' already exists, ok.", username)
return user
class TestUserMixin:
"""
Important: Test user will only be created, if django TestCase is used!
e.g:
from django.test.testcases import TestCase
class FooBarTestCase(TestUserMixin, TestCase):
def test...
"""
TEST_USERS = {
"superuser": {
"username": "superuser",
"email": "superuser@example.org",
"password": "superuser_password",
"is_staff": True,
"is_superuser": True,
},
"staff": {
"username": "staff_test_user",
"email": "staff_test_user@example.org",
"password": "staff_test_user_password",
"is_staff": True,
"is_superuser": False,
},
"normal": {
"username": "normal_test_user",
"email": "normal_test_user@example.org",
"password": "normal_test_user_password",
"is_staff": False,
"is_superuser": False,
},
}
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.UserModel = auth.get_user_model()
def setUp(self):
super().setUp()
self.create_testusers()
def create_testusers(self):
"""
Create all available testusers and UserProfiles
"""
for user_data in self.TEST_USERS.values():
user = create_user(update_existing=True, **user_data)
log.debug(f"Test user {user} created.")
def get_userdata(self, usertype):
""" return userdata from self.TEST_USERS for the given usertype """
try:
userdata = self.TEST_USERS[usertype]
except KeyError as err:
etype, evalue, etb = sys.exc_info()
evalue = etype(
f"Wrong usetype {err}! Existing usertypes are: {', '.join(list(self.TEST_USERS.keys()))}"
)
raise etype(evalue).with_traceback(etb)
if not self.UserModel.objects.filter(username=userdata["username"]).exists():
raise AssertionError('Test users not created!')
return userdata
def _get_user(self, usertype):
""" return User model instance for the given usertype"""
test_user = self.get_userdata(usertype)
return self.UserModel.objects.get(username=test_user["username"])
def login(self, usertype):
"""
Login the user defined in self.TEST_USERS
return User model instance
"""
test_user = self.get_userdata(usertype)
username = test_user["username"]
count = self.UserModel.objects.all().count()
self.assertNotEqual(count, 0, "You have to call self.create_testusers() first!")
# Some auth backends needs request object (e.g.: django-axes)
request = HttpRequest()
user = self.UserModel.objects.get(username=username)
request.user = user
ok = self.client.login(request=request, username=test_user["username"], password=test_user["password"])
self.assertTrue(ok, f'Can\'t login test user "{usertype}"!')
return self._get_user(usertype)
def add_user_permissions(self, user, permissions):
"""
add permissions to the given user instance.
permissions e.g.: ("AppLabel.add_Modelname", "auth.change_user")
"""
assert isinstance(permissions, (list, tuple))
for permission in permissions:
# permission, e.g: blog.add_blogentry
app_label, permission_codename = permission.split(".", 1)
model_name = permission_codename.split("_", 1)[1]
try:
content_type = ContentType.objects.get(app_label=app_label, model=model_name)
except ContentType.DoesNotExist:
etype, evalue, etb = sys.exc_info()
evalue = etype(
f'Can\'t get ContentType for app "{app_label}" and model "{model_name}": {evalue}'
)
raise etype(evalue).with_traceback(etb)
perm = Permission.objects.get(content_type=content_type, codename=permission_codename)
user.user_permissions.add(perm)
user.save()
self.assertTrue(user.has_perms(permissions))
def refresh_user(self, user):
"""
Return a fresh User model instance from DB.
Note: Using "user.refresh_from_db()" will not help in every case!
e.g.: Add new permission on user group and check the added one.
"""
return self.UserModel.objects.get(pk=user.pk)