This repository has been archived by the owner on Feb 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
import_users.py
executable file
·112 lines (88 loc) · 3.38 KB
/
import_users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python3.8
import json
import sys
from typing import List, Dict
import requests
import re
from pathlib import Path
BASE_URL = "https://log.bula21.ch/_/"
CREDENTIALS_FILE = Path(".credentials.json")
ROLLE_MOVE_CREW_ID = 5
def load_credentials():
if not CREDENTIALS_FILE.is_file():
with open(CREDENTIALS_FILE, "w") as f:
json.dump(dict(email="", password=""), f, indent=" ")
raise Exception(f"No credentials file found, please populate {CREDENTIALS_FILE}")
with open(CREDENTIALS_FILE, "r") as f:
credentials = json.load(f)
if not len(credentials["email"]) or not len(credentials["password"]):
raise Exception(f"Credentials in {CREDENTIALS_FILE} are empty")
return credentials
class API:
def __init__(self, base_url: str, email: str, password: str):
self._base_url = base_url.strip("/")
self._email = email
self._password = password
self._token = None
self._user = None
def login(self):
r = requests.post(
self._base_url + "/auth/authenticate",
json=dict(password=self._password, email=self._email, mode="jwt"),
)
assert r.status_code == 200
data = r.json()["data"]
self._token = data["token"]
self._user = data["user"]
print(f"Logged in as {self._user['email']}")
def get(self, url_path: str, **xx):
if "headers" not in xx:
xx["headers"] = dict()
xx["headers"]["Authorization"] = f"bearer {self._token}"
r = requests.get(self._base_url + url_path, **xx)
assert r.status_code == 200
return r.json()
def post(self, url_path: str, json_: Dict, **xx):
if "headers" not in xx:
xx["headers"] = dict()
xx["headers"]["Authorization"] = f"bearer {self._token}"
r = requests.post(self._base_url + url_path, json=json_, **xx)
assert r.status_code == 200
return r.json()
def load_emails(file: Path):
# we don't really read the CSV, we simple get all the emails we can find
with open(file, "r") as f:
emails = re.findall(r"[a-zA-Z._\-]+@bula21.ch", f.read())
emails = [e.lower() for e in emails]
return emails
def create_users(emails: List[str], api: API):
for email in emails:
first, _, last = email.partition("@")[0].partition(".")
first = first.capitalize()
last = last.replace(".", " ").capitalize()
user = dict(
status="active",
role=ROLLE_MOVE_CREW_ID,
first_name=first,
last_name=last,
email=email,
password=None,
company="Automatically Imported",
)
resp = api.post("/users", user)["data"]
print(
f"Created first_name={resp['first_name']} last_name={resp['last_name']} email={resp['email']}"
)
def main():
if len(sys.argv) != 2:
raise ValueError("Must pass CSV file to import as single argument!")
import_emails = set(load_emails(Path(sys.argv[1])))
creds = load_credentials()
api = API(BASE_URL, creds["email"], creds["password"])
api.login()
users = api.get("/users")["data"]
user_emails = set([user["email"].lower() for user in users])
emails_to_create = import_emails.difference(user_emails)
create_users(list(emails_to_create), api)
if __name__ == "__main__":
main()