From 51ab5b1725ac1a868d2b11e4d48715b3c5ed3967 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Fri, 17 Jul 2020 06:29:14 +0200 Subject: [PATCH] Move SlackUsers & SlackChannels cache to database (#423) --- ...a5cc3efce8_add_slackchannels_slackusers.py | 60 +++++++++++++ pyslackersweb/__init__.py | 13 ++- pyslackersweb/contexts.py | 53 ++++++++++- pyslackersweb/models.py | 29 +++++++ pyslackersweb/tasks.py | 62 +++++++++++++ pyslackersweb/website/contexts.py | 27 ++++-- pyslackersweb/website/database.py | 22 +++++ pyslackersweb/website/tasks.py | 87 ------------------- pyslackersweb/website/views.py | 63 +++++++------- tests/conftest.py | 23 +++-- tests/test_pyslackersweb.py | 30 +++++++ tests/test_website.py | 24 ----- 12 files changed, 333 insertions(+), 160 deletions(-) create mode 100644 migrations/versions/2020-07-13T13-12-47Z_2ba5cc3efce8_add_slackchannels_slackusers.py create mode 100644 pyslackersweb/tasks.py create mode 100644 pyslackersweb/website/database.py create mode 100644 tests/test_pyslackersweb.py diff --git a/migrations/versions/2020-07-13T13-12-47Z_2ba5cc3efce8_add_slackchannels_slackusers.py b/migrations/versions/2020-07-13T13-12-47Z_2ba5cc3efce8_add_slackchannels_slackusers.py new file mode 100644 index 00000000..9ea95cca --- /dev/null +++ b/migrations/versions/2020-07-13T13-12-47Z_2ba5cc3efce8_add_slackchannels_slackusers.py @@ -0,0 +1,60 @@ +"""add SlackChannels & SlackUsers + +Revision ID: 2ba5cc3efce8 +Revises: 3df042324a1f +Create Date: 2020-07-13 13:12:47.561813+00:00 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "2ba5cc3efce8" +down_revision = "3df042324a1f" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "slack_channels", + sa.Column("id", sa.Text(), nullable=False), + sa.Column("name", sa.Text(), nullable=True), + sa.Column("created", sa.DateTime(timezone=True), nullable=True), + sa.Column("archived", sa.Boolean(), nullable=True), + sa.Column("members", sa.Integer(), nullable=True), + sa.Column("topic", sa.Text(), nullable=True), + sa.Column("purpose", sa.Text(), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("name"), + ) + op.create_index("ix_slack_channels_id", "slack_channels", ["id"], unique=False) + op.create_index("ix_slack_channels_name", "slack_channels", ["name"], unique=False) + op.create_table( + "slack_users", + sa.Column("id", sa.Text(), nullable=False), + sa.Column("deleted", sa.Boolean(), nullable=True), + sa.Column("admin", sa.Boolean(), nullable=True), + sa.Column("bot", sa.Boolean(), nullable=True), + sa.Column("timezone", sa.Text(), nullable=True), + sa.Column("first_seen", sa.DateTime(timezone=True), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index("ix_slack_users_admin", "slack_users", ["id", "admin"], unique=False) + op.create_index("ix_slack_users_id", "slack_users", ["id"], unique=False) + op.create_index("ix_slack_users_timezone", "slack_users", ["id", "timezone"], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index("ix_slack_users_timezone", table_name="slack_users") + op.drop_index("ix_slack_users_id", table_name="slack_users") + op.drop_index("ix_slack_users_admin", table_name="slack_users") + op.drop_table("slack_users") + op.drop_index("ix_slack_channels_name", table_name="slack_channels") + op.drop_index("ix_slack_channels_id", table_name="slack_channels") + op.drop_table("slack_channels") + # ### end Alembic commands ### diff --git a/pyslackersweb/__init__.py b/pyslackersweb/__init__.py index d15cbe8c..b46fdc08 100644 --- a/pyslackersweb/__init__.py +++ b/pyslackersweb/__init__.py @@ -9,7 +9,14 @@ from sentry_sdk.integrations.logging import LoggingIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration -from .contexts import apscheduler, client_session, redis_pool, postgresql_pool, slack_client +from .contexts import ( + apscheduler, + client_session, + redis_pool, + postgresql_pool, + slack_client, + background_jobs, +) from .middleware import request_context_middleware from . import settings, website, sirbot @@ -58,7 +65,9 @@ async def app_factory(*args) -> web.Application: # pylint: disable=unused-argum SLACK_TOKEN=settings.SLACK_TOKEN, ) - app.cleanup_ctx.extend([apscheduler, client_session, redis_pool, postgresql_pool, slack_client]) + app.cleanup_ctx.extend( + [apscheduler, client_session, redis_pool, postgresql_pool, slack_client, background_jobs] + ) app.router.add_get("/", index) diff --git a/pyslackersweb/contexts.py b/pyslackersweb/contexts.py index a4488dcd..9adaf480 100644 --- a/pyslackersweb/contexts.py +++ b/pyslackersweb/contexts.py @@ -1,14 +1,25 @@ -from typing import AsyncGenerator - import json +import datetime +import logging + +from typing import AsyncGenerator import aioredis import asyncpgsa +import asyncpg.pool +import sqlalchemy as sa from asyncpgsa.connection import get_dialect from aiohttp import ClientSession, web from apscheduler.schedulers.asyncio import AsyncIOScheduler from slack.io.aiohttp import SlackAPI +from sqlalchemy import select + +from pyslackersweb.util.log import ContextAwareLoggerAdapter +from . import tasks, models + + +logger = ContextAwareLoggerAdapter(logging.getLogger(__name__)) def _register_in_app(app: web.Application, name: str, item) -> None: @@ -58,3 +69,41 @@ async def slack_client(app: web.Application) -> AsyncGenerator[None, None]: _register_in_app(app, "slack_client_legacy", slack_legacy) yield + + +async def background_jobs(app: web.Application) -> AsyncGenerator[None, None]: + scheduler = app["scheduler"] + pg: asyncpg.pool.Pool = app["pg"] + slack_client_: SlackAPI = app["slack_client"] + + next_run_time = None + if await _is_empty_table(pg, models.SlackUsers.c.id): + next_run_time = datetime.datetime.now() + datetime.timedelta(minutes=1) + + scheduler.add_job( + tasks.sync_slack_users, + "cron", + minute=0, + args=(slack_client_, pg), + next_run_time=next_run_time, + ) + + next_run_time = None + if await _is_empty_table(pg, models.SlackChannels.c.id): + next_run_time = datetime.datetime.now() + datetime.timedelta(minutes=1) + + scheduler.add_job( + tasks.sync_slack_channels, + "cron", + minute=15, + args=(slack_client_, pg), + next_run_time=next_run_time, + ) + + yield + + +async def _is_empty_table(pg: asyncpg.pool.Pool, column: sa.Column) -> bool: + async with pg.acquire() as conn: + result = await conn.fetchval(select([column]).limit(1)) + return result is None diff --git a/pyslackersweb/models.py b/pyslackersweb/models.py index d73f5cd5..06588c03 100644 --- a/pyslackersweb/models.py +++ b/pyslackersweb/models.py @@ -29,4 +29,33 @@ class Source(Enum): nullable=False, ), sa.Index("ix_domain_blocked", "domain", "blocked"), + sa.Index("ix_domains_domain", "domain"), +) + +SlackChannels = sa.Table( + "slack_channels", + metadata, + sa.Column("id", sa.Text, primary_key=True), + sa.Column("name", sa.Text, unique=True), + sa.Column("created", sa.DateTime(timezone=True)), + sa.Column("archived", sa.Boolean), + sa.Column("members", sa.Integer), + sa.Column("topic", sa.Text), + sa.Column("purpose", sa.Text), + sa.Index("ix_slack_channels_id", "id"), + sa.Index("ix_slack_channels_name", "name"), +) + +SlackUsers = sa.Table( + "slack_users", + metadata, + sa.Column("id", sa.Text, primary_key=True), + sa.Column("deleted", sa.Boolean), + sa.Column("admin", sa.Boolean), + sa.Column("bot", sa.Boolean), + sa.Column("timezone", sa.Text), + sa.Column("first_seen", sa.DateTime(timezone=True), default=datetime.now), + sa.Index("ix_slack_users_id", "id"), + sa.Index("ix_slack_users_admin", "id", "admin"), + sa.Index("ix_slack_users_timezone", "id", "timezone"), ) diff --git a/pyslackersweb/tasks.py b/pyslackersweb/tasks.py new file mode 100644 index 00000000..d49d7d45 --- /dev/null +++ b/pyslackersweb/tasks.py @@ -0,0 +1,62 @@ +import asyncio +import logging +import datetime + +import slack +import asyncpg.pool + +from slack.io.abc import SlackAPI +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from pyslackersweb import models +from pyslackersweb.util.log import ContextAwareLoggerAdapter + + +logger = ContextAwareLoggerAdapter(logging.getLogger(__name__)) + + +async def sync_slack_users(slack_client: SlackAPI, pg: asyncpg.pool.Pool,) -> None: + logger.debug("Refreshing slack users cache.") + try: + async with pg.acquire() as conn: + async for user in slack_client.iter(slack.methods.USERS_LIST, minimum_time=3): + values = { + "deleted": user["deleted"], + "admin": user["is_admin"], + "bot": user["is_bot"], + "timezone": user["tz"], + } + await conn.execute( + pg_insert(models.SlackUsers) + .values(id=user["id"], **values) + .on_conflict_do_update(index_elements=[models.SlackUsers.c.id], set_=values) + ) + except asyncio.CancelledError: + logger.debug("Slack users cache refresh canceled") + except Exception: # pylint: disable=broad-except + logger.exception("Error refreshing slack users cache") + + +async def sync_slack_channels(slack_client: SlackAPI, pg: asyncpg.pool.Pool) -> None: + logger.debug("Refreshing slack channels cache.") + + try: + async with pg.acquire() as conn: + async for channel in slack_client.iter(slack.methods.CONVERSATIONS_LIST): + values = { + "name": channel["name"], + "created": datetime.datetime.fromtimestamp(channel["created"]), + "archived": channel["is_archived"], + "members": channel["num_members"], + "topic": channel["topic"]["value"], + "purpose": channel["purpose"]["value"], + } + await conn.execute( + pg_insert(models.SlackChannels) + .values(id=channel["id"], **values) + .on_conflict_do_update(index_elements=[models.SlackChannels.c.id], set_=values) + ) + except asyncio.CancelledError: + logger.debug("Slack channels cache refresh canceled") + except Exception: # pylint: disable=broad-except + logger.exception("Error refreshing slack channels cache") diff --git a/pyslackersweb/website/contexts.py b/pyslackersweb/website/contexts.py index b0881a78..db2bce49 100644 --- a/pyslackersweb/website/contexts.py +++ b/pyslackersweb/website/contexts.py @@ -1,8 +1,9 @@ +import datetime + from typing import AsyncGenerator from aiohttp import ClientSession, web from aioredis.abc import AbcConnection -from slack.io.aiohttp import SlackAPI from . import tasks @@ -11,17 +12,27 @@ async def background_jobs(app: web.Application) -> AsyncGenerator[None, None]: scheduler = app["scheduler"] client_session: ClientSession = app["client_session"] redis: AbcConnection = app["redis"] - slack_client_: SlackAPI = app["slack_client"] pg = app["pg"] + # If redis is empty (new dev environments) run the task in one minute + next_run_time = None + if not await redis.exists(tasks.GITHUB_REPO_CACHE_KEY): + next_run_time = datetime.datetime.now() + datetime.timedelta(minutes=1) + scheduler.add_job( - tasks.sync_github_repositories, "cron", minute=30, args=(client_session, redis) + tasks.sync_github_repositories, + "cron", + minute=30, + args=(client_session, redis), + next_run_time=next_run_time, ) - scheduler.add_job(tasks.sync_slack_users, "cron", minute=0, args=(slack_client_, redis)) - - scheduler.add_job(tasks.sync_slack_channels, "cron", minute=15, args=(slack_client_, redis)) - - scheduler.add_job(tasks.sync_burner_domains, "cron", hour=0, args=(client_session, pg)) + scheduler.add_job( + tasks.sync_burner_domains, + "cron", + hour=0, + args=(client_session, pg), + next_run_time=next_run_time, + ) yield diff --git a/pyslackersweb/website/database.py b/pyslackersweb/website/database.py new file mode 100644 index 00000000..b7112a7a --- /dev/null +++ b/pyslackersweb/website/database.py @@ -0,0 +1,22 @@ +import logging + +import asyncpg + +from pyslackersweb.util.log import ContextAwareLoggerAdapter + + +logger = ContextAwareLoggerAdapter(logging.getLogger(__name__)) + + +async def get_user_count(conn: asyncpg.connection.Connection) -> int: + return await conn.fetchval("SELECT count(id) FROM slack_users") + + +async def get_timezones(conn: asyncpg.connection.Connection) -> dict: + timezones = {} + rows = await conn.fetch("SELECT timezone, count(id) FROM slack_users GROUP BY timezone") + for row in rows: + if row["timezone"] is not None: + timezones[row["timezone"]] = row["count"] + + return timezones diff --git a/pyslackersweb/website/tasks.py b/pyslackersweb/website/tasks.py index 57056b6b..24f47ade 100644 --- a/pyslackersweb/website/tasks.py +++ b/pyslackersweb/website/tasks.py @@ -2,13 +2,10 @@ import dataclasses import json import logging -from collections import Counter from typing import Any, Dict, List -import slack from aiohttp import ClientSession from aioredis.abc import AbcConnection as RedisConnection -from slack.io.abc import SlackAPI from sqlalchemy.dialects.postgresql import insert as pg_insert from pyslackersweb.models import domains, Source @@ -19,12 +16,6 @@ GITHUB_REPO_CACHE_KEY = "github:repos" -SLACK_CHANNEL_CACHE_KEY = "slack:channels" - -SLACK_COUNT_CACHE_KEY = "slack:users:count" - -SLACK_TZ_CACHE_KEY = "slack:users:timezones" - @dataclasses.dataclass(frozen=True) # pylint: disable=too-few-public-methods class Repository: @@ -76,84 +67,6 @@ async def sync_github_repositories( return repositories -async def sync_slack_users( - slack_client: SlackAPI, - redis: RedisConnection, - *, - cache_key_tz: str = SLACK_TZ_CACHE_KEY, - cache_key_count: str = SLACK_COUNT_CACHE_KEY, -) -> Counter: - logger.debug("Refreshing slack users cache.") - - counter: Counter = Counter() - try: - async for user in slack_client.iter(slack.methods.USERS_LIST, minimum_time=3): - if user["deleted"] or user["is_bot"] or not user["tz"]: - continue - - counter[user["tz"]] += 1 - - logger.debug( - "Found %s users across %s timezones", sum(counter.values()), len(list(counter.keys())) - ) - - tx = redis.multi_exec() - tx.delete(cache_key_tz) - tx.hmset_dict(cache_key_tz, dict(counter.most_common(100))) - tx.set(cache_key_count, str(sum(counter.values()))) - await tx.execute() - - except asyncio.CancelledError: - logger.debug("Slack users cache refresh canceled") - except Exception: # pylint: disable=broad-except - logger.exception("Error refreshing slack users cache") - - return counter - - -@dataclasses.dataclass(frozen=True) # pylint: disable=too-few-public-methods -class Channel: - id: str - name: str - topic: str - purpose: str - members: int - - -async def sync_slack_channels( - slack_client: SlackAPI, redis: RedisConnection, *, cache_key: str = SLACK_CHANNEL_CACHE_KEY -) -> List[Channel]: - logger.debug("Refreshing slack channels cache.") - - channels = [] - try: - async for channel in slack_client.iter(slack.methods.CHANNELS_LIST): - channels.append( - Channel( - id=channel["id"], - name=channel["name"], - topic=channel["topic"]["value"], - purpose=channel["purpose"]["value"], - members=channel["num_members"], - ) - ) - - channels.sort(key=lambda c: c.name) - - logger.debug("Found %s slack channels", len(channels)) - - await redis.set( - cache_key, json.dumps([dataclasses.asdict(channel) for channel in channels]) - ) - - except asyncio.CancelledError: - logger.debug("Slack channels cache refresh canceled") - except Exception: # pylint: disable=broad-except - logger.exception("Error refreshing slack channels cache") - - return channels - - async def sync_burner_domains(session: ClientSession, pg) -> List[Dict[str, Any]]: logger.debug("Refreshing burner domain list") diff --git a/pyslackersweb/website/views.py b/pyslackersweb/website/views.py index 8939b12f..e7caba0a 100644 --- a/pyslackersweb/website/views.py +++ b/pyslackersweb/website/views.py @@ -12,9 +12,9 @@ from pyslackersweb.util.log import ContextAwareLoggerAdapter from pyslackersweb.models import domains, Source -from . import settings +from . import settings, database from .models import InviteSchema -from .tasks import GITHUB_REPO_CACHE_KEY, SLACK_COUNT_CACHE_KEY, SLACK_TZ_CACHE_KEY +from .tasks import GITHUB_REPO_CACHE_KEY logger = ContextAwareLoggerAdapter(logging.getLogger(__name__)) @@ -27,27 +27,29 @@ class Index(web.View): @template("index.html") async def get(self): redis = self.request.app["redis"] - - return { - "member_count": int((await redis.get(SLACK_COUNT_CACHE_KEY, encoding="utf-8")) or 0), - "projects": json.loads( - await redis.get(GITHUB_REPO_CACHE_KEY, encoding="utf-8") or "{}" - ), - "sponsors": [ - { - "image": self.request.app.router["static"].url_for( - filename="images/sponsor_platformsh.svg" - ), - "href": "https://platform.sh/?medium=referral&utm_campaign=sponsored_sites&utm_source=pyslackers", # pylint: disable=line-too-long - }, - { - "image": self.request.app.router["static"].url_for( - filename="images/sponsor_sentry.svg" - ), - "href": "https://sentry.io/?utm_source=referral&utm_content=pyslackers&utm_campaign=community", # pylint: disable=line-too-long - }, - ], - } + pg = self.request.app["pg"] + + async with pg.acquire() as conn: + return { + "member_count": await database.get_user_count(conn), + "projects": json.loads( + await redis.get(GITHUB_REPO_CACHE_KEY, encoding="utf-8") or "{}" + ), + "sponsors": [ + { + "image": self.request.app.router["static"].url_for( + filename="images/sponsor_platformsh.svg" + ), + "href": "https://platform.sh/?medium=referral&utm_campaign=sponsored_sites&utm_source=pyslackers", # pylint: disable=line-too-long + }, + { + "image": self.request.app.router["static"].url_for( + filename="images/sponsor_sentry.svg" + ), + "href": "https://sentry.io/?utm_source=referral&utm_content=pyslackers&utm_campaign=community", # pylint: disable=line-too-long + }, + ], + } @routes.view("/slack", name="slack") @@ -59,14 +61,13 @@ def pg(self): return self.request.app["pg"] async def shared_response(self): - redis = self.request.app["redis"] - - return { - "member_count": int((await redis.get(SLACK_COUNT_CACHE_KEY, encoding="utf-8")) or 0), - "member_timezones": await redis.hgetall(SLACK_TZ_CACHE_KEY, encoding="utf-8"), - "errors": {}, - "disable_invites": settings.DISABLE_INVITES, - } + async with self.pg.acquire() as conn: + return { + "member_count": await database.get_user_count(conn), + "member_timezones": await database.get_timezones(conn), + "errors": {}, + "disable_invites": settings.DISABLE_INVITES, + } async def allowed_email(self, email: str) -> bool: # this really should be in the schema validation, but it doesn't support async checks (yet). diff --git a/tests/conftest.py b/tests/conftest.py index c1e436cb..4e9b8cf8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,24 +4,35 @@ import pyslackersweb.website.tasks from sqlalchemy import delete -from pyslackersweb.models import domains pytest_plugins = ("slack.tests.plugin",) +@pytest.fixture() +async def slack_client_ctx(slack_client): + async def ctx(app): + pyslackersweb.contexts._register_in_app(app, "slack_client", slack_client) + pyslackersweb.contexts._register_in_app(app, "slack_client_legacy", slack_client) + yield + + return ctx + + @pytest.fixture -async def client(aiohttp_client, slack_client): +async def client(monkeypatch, aiohttp_client, slack_client_ctx): + + # Patch imported slack_client context in pyslackersweb/__init__.py with the fake slack client ctx + monkeypatch.setattr(pyslackersweb, "slack_client", slack_client_ctx) application = await pyslackersweb.app_factory() app_client = await aiohttp_client(application) app_client.app["scheduler"].shutdown() - pyslackersweb.contexts._register_in_app(app_client.app, "slack_client", slack_client) - pyslackersweb.contexts._register_in_app(app_client.app, "slack_client_legacy", slack_client) - yield app_client # cleanup database async with app_client.app["pg"].acquire() as conn: - await conn.fetch(delete(domains)) + await conn.fetch(delete(pyslackersweb.models.domains)) + await conn.fetch(delete(pyslackersweb.models.SlackUsers)) + await conn.fetch(delete(pyslackersweb.models.SlackChannels)) diff --git a/tests/test_pyslackersweb.py b/tests/test_pyslackersweb.py new file mode 100644 index 00000000..1e42b42d --- /dev/null +++ b/tests/test_pyslackersweb.py @@ -0,0 +1,30 @@ +import logging +import pytest + +from pyslackersweb import tasks, contexts, models + + +@pytest.mark.parametrize("slack_client", ({"body": ["users_iter", "users"]},), indirect=True) +async def test_task_sync_slack_users(client, caplog): + + assert await contexts._is_empty_table(client.app["pg"], models.SlackUsers.c.id) + + await tasks.sync_slack_users(client.app["slack_client"], client.app["pg"]) + + assert not (await contexts._is_empty_table(client.app["pg"], models.SlackUsers.c.id)) + + for record in caplog.records: + assert record.levelno <= logging.INFO + + +@pytest.mark.parametrize("slack_client", ({"body": ["channels_iter", "channels"]},), indirect=True) +async def test_task_sync_slack_channels(client, caplog): + + assert await contexts._is_empty_table(client.app["pg"], models.SlackChannels.c.id) + + await tasks.sync_slack_channels(client.app["slack_client"], client.app["pg"]) + + assert not (await contexts._is_empty_table(client.app["pg"], models.SlackChannels.c.id)) + + for record in caplog.records: + assert record.levelno <= logging.INFO diff --git a/tests/test_website.py b/tests/test_website.py index a7d39b90..b7410b50 100644 --- a/tests/test_website.py +++ b/tests/test_website.py @@ -155,27 +155,3 @@ async def test_task_sync_github_repositories(client, caplog): for record in caplog.records: assert record.levelno <= logging.INFO - - -@pytest.mark.parametrize("slack_client", ({"body": ["users_iter", "users"]},), indirect=True) -async def test_task_sync_slack_users(client, caplog): - - result = await tasks.sync_slack_users(client.app["slack_client"], client.app["redis"]) - - assert result - assert len(result) == 1 - assert result["America/Los_Angeles"] == 2 - - for record in caplog.records: - assert record.levelno <= logging.INFO - - -@pytest.mark.parametrize("slack_client", ({"body": ["channels_iter", "channels"]},), indirect=True) -async def test_task_sync_slack_channels(client, caplog): - - result = await tasks.sync_slack_channels(client.app["slack_client"], client.app["redis"]) - - assert result - - for record in caplog.records: - assert record.levelno <= logging.INFO