diff --git a/datasette_acl/__init__.py b/datasette_acl/__init__.py index 7aeb69b..8c100e1 100644 --- a/datasette_acl/__init__.py +++ b/datasette_acl/__init__.py @@ -149,8 +149,8 @@ def clear_expired(self): one_second_cache = OneSecondCache() -async def update_dynamic_groups(datasette, actor): - if one_second_cache.get(actor["id"]): +async def update_dynamic_groups(datasette, actor, skip_cache=False): + if (not skip_cache) and one_second_cache.get(actor["id"]): # Don't do this more than once a second per actor return one_second_cache.set(actor["id"], 1) diff --git a/tests/test_acl.py b/tests/test_acl.py index 9b5e52a..9d6d9d9 100644 --- a/tests/test_acl.py +++ b/tests/test_acl.py @@ -1,11 +1,75 @@ from datasette.app import Datasette +from datasette_acl import update_dynamic_groups import pytest @pytest.mark.asyncio -async def test_plugin_is_installed(): - datasette = Datasette(memory=True) - response = await datasette.client.get("/-/plugins.json") - assert response.status_code == 200 - installed_plugins = {p["name"] for p in response.json()} - assert "datasette-acl" in installed_plugins +async def test_update_dynamic_groups(): + datasette = Datasette( + config={ + "plugins": { + "datasette-acl": { + "dynamic-groups": { + "admin": {"is_admin": True}, + } + } + } + } + ) + await datasette.invoke_startup() + db = datasette.get_internal_database() + # Should have those tables + tables = await db.table_names() + assert { + "acl_resources", + "acl_actions", + "acl_groups", + "acl_actor_groups", + "acl", + }.issubset(tables) + # Group tables should start empty + assert (await db.execute("select count(*) from acl_groups")).single_value() == 0 + assert ( + await db.execute("select count(*) from acl_actor_groups") + ).single_value() == 0 + # An actor with is_admin: True should be added to the group + await update_dynamic_groups( + datasette, {"is_admin": True, "id": "admin"}, skip_cache=True + ) + assert [dict(r) for r in (await db.execute("select * from acl_groups")).rows] == [ + {"id": 1, "name": "admin"}, + ] + assert [ + dict(r) + for r in ( + await db.execute( + "select actor_id, (select name from acl_groups where id = group_id) as group_name from acl_actor_groups" + ) + ).rows + ] == [ + {"actor_id": "admin", "group_name": "admin"}, + ] + # If that user changes they should drop from the group + await update_dynamic_groups( + datasette, {"is_admin": False, "id": "admin"}, skip_cache=True + ) + assert [ + dict(r) + for r in ( + await db.execute( + "select actor_id, (select name from acl_groups where id = group_id) as group_name from acl_actor_groups" + ) + ).rows + ] == [] + # Groups that are not dynamic should not be modified + await db.execute_write("insert into acl_groups (id, name) values (2, 'static')") + await db.execute_write( + "insert into acl_actor_groups (actor_id, group_id) values ('admin', 2)" + ) + await update_dynamic_groups( + datasette, {"is_admin": False, "id": "admin"}, skip_cache=True + ) + assert [dict(r) for r in (await db.execute("select * from acl_groups")).rows] == [ + {"id": 1, "name": "admin"}, + {"id": 2, "name": "static"}, + ]