Skip to content

Commit

Permalink
Move SlackUsers & SlackChannels cache to database (#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovv authored Jul 17, 2020
1 parent 32ae326 commit 51ab5b1
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 160 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""add SlackChannels & SlackUsers
Revision ID: 2ba5cc3efce8
Revises: 3df042324a1f
Create Date: 2020-07-13 13:12:47.561813+00:00
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "2ba5cc3efce8"
down_revision = "3df042324a1f"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"slack_channels",
sa.Column("id", sa.Text(), nullable=False),
sa.Column("name", sa.Text(), nullable=True),
sa.Column("created", sa.DateTime(timezone=True), nullable=True),
sa.Column("archived", sa.Boolean(), nullable=True),
sa.Column("members", sa.Integer(), nullable=True),
sa.Column("topic", sa.Text(), nullable=True),
sa.Column("purpose", sa.Text(), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name"),
)
op.create_index("ix_slack_channels_id", "slack_channels", ["id"], unique=False)
op.create_index("ix_slack_channels_name", "slack_channels", ["name"], unique=False)
op.create_table(
"slack_users",
sa.Column("id", sa.Text(), nullable=False),
sa.Column("deleted", sa.Boolean(), nullable=True),
sa.Column("admin", sa.Boolean(), nullable=True),
sa.Column("bot", sa.Boolean(), nullable=True),
sa.Column("timezone", sa.Text(), nullable=True),
sa.Column("first_seen", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_slack_users_admin", "slack_users", ["id", "admin"], unique=False)
op.create_index("ix_slack_users_id", "slack_users", ["id"], unique=False)
op.create_index("ix_slack_users_timezone", "slack_users", ["id", "timezone"], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_slack_users_timezone", table_name="slack_users")
op.drop_index("ix_slack_users_id", table_name="slack_users")
op.drop_index("ix_slack_users_admin", table_name="slack_users")
op.drop_table("slack_users")
op.drop_index("ix_slack_channels_name", table_name="slack_channels")
op.drop_index("ix_slack_channels_id", table_name="slack_channels")
op.drop_table("slack_channels")
# ### end Alembic commands ###
13 changes: 11 additions & 2 deletions pyslackersweb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

from .contexts import apscheduler, client_session, redis_pool, postgresql_pool, slack_client
from .contexts import (
apscheduler,
client_session,
redis_pool,
postgresql_pool,
slack_client,
background_jobs,
)
from .middleware import request_context_middleware
from . import settings, website, sirbot

Expand Down Expand Up @@ -58,7 +65,9 @@ async def app_factory(*args) -> web.Application: # pylint: disable=unused-argum
SLACK_TOKEN=settings.SLACK_TOKEN,
)

app.cleanup_ctx.extend([apscheduler, client_session, redis_pool, postgresql_pool, slack_client])
app.cleanup_ctx.extend(
[apscheduler, client_session, redis_pool, postgresql_pool, slack_client, background_jobs]
)

app.router.add_get("/", index)

Expand Down
53 changes: 51 additions & 2 deletions pyslackersweb/contexts.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
from typing import AsyncGenerator

import json
import datetime
import logging

from typing import AsyncGenerator

import aioredis
import asyncpgsa
import asyncpg.pool
import sqlalchemy as sa

from asyncpgsa.connection import get_dialect
from aiohttp import ClientSession, web
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from slack.io.aiohttp import SlackAPI
from sqlalchemy import select

from pyslackersweb.util.log import ContextAwareLoggerAdapter
from . import tasks, models


logger = ContextAwareLoggerAdapter(logging.getLogger(__name__))


def _register_in_app(app: web.Application, name: str, item) -> None:
Expand Down Expand Up @@ -58,3 +69,41 @@ async def slack_client(app: web.Application) -> AsyncGenerator[None, None]:
_register_in_app(app, "slack_client_legacy", slack_legacy)

yield


async def background_jobs(app: web.Application) -> AsyncGenerator[None, None]:
scheduler = app["scheduler"]
pg: asyncpg.pool.Pool = app["pg"]
slack_client_: SlackAPI = app["slack_client"]

next_run_time = None
if await _is_empty_table(pg, models.SlackUsers.c.id):
next_run_time = datetime.datetime.now() + datetime.timedelta(minutes=1)

scheduler.add_job(
tasks.sync_slack_users,
"cron",
minute=0,
args=(slack_client_, pg),
next_run_time=next_run_time,
)

next_run_time = None
if await _is_empty_table(pg, models.SlackChannels.c.id):
next_run_time = datetime.datetime.now() + datetime.timedelta(minutes=1)

scheduler.add_job(
tasks.sync_slack_channels,
"cron",
minute=15,
args=(slack_client_, pg),
next_run_time=next_run_time,
)

yield


async def _is_empty_table(pg: asyncpg.pool.Pool, column: sa.Column) -> bool:
async with pg.acquire() as conn:
result = await conn.fetchval(select([column]).limit(1))
return result is None
29 changes: 29 additions & 0 deletions pyslackersweb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,33 @@ class Source(Enum):
nullable=False,
),
sa.Index("ix_domain_blocked", "domain", "blocked"),
sa.Index("ix_domains_domain", "domain"),
)

SlackChannels = sa.Table(
"slack_channels",
metadata,
sa.Column("id", sa.Text, primary_key=True),
sa.Column("name", sa.Text, unique=True),
sa.Column("created", sa.DateTime(timezone=True)),
sa.Column("archived", sa.Boolean),
sa.Column("members", sa.Integer),
sa.Column("topic", sa.Text),
sa.Column("purpose", sa.Text),
sa.Index("ix_slack_channels_id", "id"),
sa.Index("ix_slack_channels_name", "name"),
)

SlackUsers = sa.Table(
"slack_users",
metadata,
sa.Column("id", sa.Text, primary_key=True),
sa.Column("deleted", sa.Boolean),
sa.Column("admin", sa.Boolean),
sa.Column("bot", sa.Boolean),
sa.Column("timezone", sa.Text),
sa.Column("first_seen", sa.DateTime(timezone=True), default=datetime.now),
sa.Index("ix_slack_users_id", "id"),
sa.Index("ix_slack_users_admin", "id", "admin"),
sa.Index("ix_slack_users_timezone", "id", "timezone"),
)
62 changes: 62 additions & 0 deletions pyslackersweb/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
import logging
import datetime

import slack
import asyncpg.pool

from slack.io.abc import SlackAPI
from sqlalchemy.dialects.postgresql import insert as pg_insert

from pyslackersweb import models
from pyslackersweb.util.log import ContextAwareLoggerAdapter


logger = ContextAwareLoggerAdapter(logging.getLogger(__name__))


async def sync_slack_users(slack_client: SlackAPI, pg: asyncpg.pool.Pool,) -> None:
logger.debug("Refreshing slack users cache.")
try:
async with pg.acquire() as conn:
async for user in slack_client.iter(slack.methods.USERS_LIST, minimum_time=3):
values = {
"deleted": user["deleted"],
"admin": user["is_admin"],
"bot": user["is_bot"],
"timezone": user["tz"],
}
await conn.execute(
pg_insert(models.SlackUsers)
.values(id=user["id"], **values)
.on_conflict_do_update(index_elements=[models.SlackUsers.c.id], set_=values)
)
except asyncio.CancelledError:
logger.debug("Slack users cache refresh canceled")
except Exception: # pylint: disable=broad-except
logger.exception("Error refreshing slack users cache")


async def sync_slack_channels(slack_client: SlackAPI, pg: asyncpg.pool.Pool) -> None:
logger.debug("Refreshing slack channels cache.")

try:
async with pg.acquire() as conn:
async for channel in slack_client.iter(slack.methods.CONVERSATIONS_LIST):
values = {
"name": channel["name"],
"created": datetime.datetime.fromtimestamp(channel["created"]),
"archived": channel["is_archived"],
"members": channel["num_members"],
"topic": channel["topic"]["value"],
"purpose": channel["purpose"]["value"],
}
await conn.execute(
pg_insert(models.SlackChannels)
.values(id=channel["id"], **values)
.on_conflict_do_update(index_elements=[models.SlackChannels.c.id], set_=values)
)
except asyncio.CancelledError:
logger.debug("Slack channels cache refresh canceled")
except Exception: # pylint: disable=broad-except
logger.exception("Error refreshing slack channels cache")
27 changes: 19 additions & 8 deletions pyslackersweb/website/contexts.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import datetime

from typing import AsyncGenerator

from aiohttp import ClientSession, web
from aioredis.abc import AbcConnection
from slack.io.aiohttp import SlackAPI

from . import tasks

Expand All @@ -11,17 +12,27 @@ async def background_jobs(app: web.Application) -> AsyncGenerator[None, None]:
scheduler = app["scheduler"]
client_session: ClientSession = app["client_session"]
redis: AbcConnection = app["redis"]
slack_client_: SlackAPI = app["slack_client"]
pg = app["pg"]

# If redis is empty (new dev environments) run the task in one minute
next_run_time = None
if not await redis.exists(tasks.GITHUB_REPO_CACHE_KEY):
next_run_time = datetime.datetime.now() + datetime.timedelta(minutes=1)

scheduler.add_job(
tasks.sync_github_repositories, "cron", minute=30, args=(client_session, redis)
tasks.sync_github_repositories,
"cron",
minute=30,
args=(client_session, redis),
next_run_time=next_run_time,
)

scheduler.add_job(tasks.sync_slack_users, "cron", minute=0, args=(slack_client_, redis))

scheduler.add_job(tasks.sync_slack_channels, "cron", minute=15, args=(slack_client_, redis))

scheduler.add_job(tasks.sync_burner_domains, "cron", hour=0, args=(client_session, pg))
scheduler.add_job(
tasks.sync_burner_domains,
"cron",
hour=0,
args=(client_session, pg),
next_run_time=next_run_time,
)

yield
22 changes: 22 additions & 0 deletions pyslackersweb/website/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging

import asyncpg

from pyslackersweb.util.log import ContextAwareLoggerAdapter


logger = ContextAwareLoggerAdapter(logging.getLogger(__name__))


async def get_user_count(conn: asyncpg.connection.Connection) -> int:
return await conn.fetchval("SELECT count(id) FROM slack_users")


async def get_timezones(conn: asyncpg.connection.Connection) -> dict:
timezones = {}
rows = await conn.fetch("SELECT timezone, count(id) FROM slack_users GROUP BY timezone")
for row in rows:
if row["timezone"] is not None:
timezones[row["timezone"]] = row["count"]

return timezones
Loading

0 comments on commit 51ab5b1

Please sign in to comment.