-
-
Notifications
You must be signed in to change notification settings - Fork 32k
/
Copy pathauth.py
126 lines (102 loc) · 4.07 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Script to manage users for the Home Assistant auth provider."""
import argparse
import asyncio
from collections.abc import Sequence
import logging
import os
from typing import TYPE_CHECKING
from homeassistant import runner
from homeassistant.auth import auth_manager_from_config
from homeassistant.auth.providers import homeassistant as hass_auth
from homeassistant.config import get_default_config_dir
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
# mypy: allow-untyped-calls, allow-untyped-defs
def run(args: Sequence[str] | None) -> None:
"""Handle Home Assistant auth provider script."""
parser = argparse.ArgumentParser(description="Manage Home Assistant users")
parser.add_argument("--script", choices=["auth"])
parser.add_argument(
"-c",
"--config",
default=get_default_config_dir(),
help="Directory that contains the Home Assistant configuration",
)
subparsers = parser.add_subparsers(dest="func")
subparsers.required = True
parser_list = subparsers.add_parser("list")
parser_list.set_defaults(func=list_users)
parser_add = subparsers.add_parser("add")
parser_add.add_argument("username", type=str)
parser_add.add_argument("password", type=str)
parser_add.set_defaults(func=add_user)
parser_validate_login = subparsers.add_parser("validate")
parser_validate_login.add_argument("username", type=str)
parser_validate_login.add_argument("password", type=str)
parser_validate_login.set_defaults(func=validate_login)
parser_change_pw = subparsers.add_parser("change_password")
parser_change_pw.add_argument("username", type=str)
parser_change_pw.add_argument("new_password", type=str)
parser_change_pw.set_defaults(func=change_password)
asyncio.set_event_loop_policy(runner.HassEventLoopPolicy(False))
asyncio.run(run_command(parser.parse_args(args)))
async def run_command(args: argparse.Namespace) -> None:
"""Run the command."""
hass = HomeAssistant(os.path.join(os.getcwd(), args.config))
await asyncio.gather(dr.async_load(hass), er.async_load(hass))
hass.auth = await auth_manager_from_config(hass, [{"type": "homeassistant"}], [])
provider = hass.auth.auth_providers[0]
await provider.async_initialize()
await args.func(hass, provider, args)
# Triggers save on used storage helpers with delay (core auth)
logging.getLogger("homeassistant.core").setLevel(logging.WARNING)
await hass.async_stop()
async def list_users(
hass: HomeAssistant, provider: hass_auth.HassAuthProvider, args: argparse.Namespace
) -> None:
"""List the users."""
count = 0
if TYPE_CHECKING:
assert provider.data
for user in provider.data.users:
count += 1
print(user["username"])
print()
print("Total users:", count)
async def add_user(
hass: HomeAssistant, provider: hass_auth.HassAuthProvider, args: argparse.Namespace
) -> None:
"""Create a user."""
if TYPE_CHECKING:
assert provider.data
try:
provider.data.add_auth(args.username, args.password)
except hass_auth.InvalidUser:
print("Username already exists!")
return
# Save username/password
await provider.data.async_save()
print("Auth created")
async def validate_login(
hass: HomeAssistant, provider: hass_auth.HassAuthProvider, args: argparse.Namespace
) -> None:
"""Validate a login."""
if TYPE_CHECKING:
assert provider.data
try:
provider.data.validate_login(args.username, args.password)
print("Auth valid")
except hass_auth.InvalidAuth:
print("Auth invalid")
async def change_password(
hass: HomeAssistant, provider: hass_auth.HassAuthProvider, args: argparse.Namespace
) -> None:
"""Change password."""
if TYPE_CHECKING:
assert provider.data
try:
provider.data.change_password(args.username, args.new_password)
await provider.data.async_save()
print("Password changed")
except hass_auth.InvalidUser:
print("User not found")