diff --git a/README.md b/README.md index 314398d..96ab4bf 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ My Blog Using Sanic * 数据库: TortoiseORM/aiomysql * 缓存: aiomcache * KV数据库: aioredis -* 分布式任务管理器: aiotasks +* 任务队列: arq 其他aio扩展: Sanic-Auth、Sanic-wtf、sanic-session、aiotask-context、asyncblink、sanic-sentry、sanic-jwt、aiosmtplib diff --git a/models/comment.py b/models/comment.py index de0af3b..4dd215f 100644 --- a/models/comment.py +++ b/models/comment.py @@ -1,13 +1,16 @@ import mistune from tortoise import fields from tortoise.query_utils import Q +from arq import create_pool +from config import REDIS_URL from .base import BaseModel from .mc import cache, clear_mc from .user import GithubUser from .consts import K_COMMENT, ONE_HOUR from .react import ReactMixin, ReactItem from .signals import comment_reacted +from .utils import RedisSettings markdown = mistune.Markdown() MC_KEY_COMMENT_LIST = 'comment:%s:comment_list' @@ -64,8 +67,8 @@ async def add_comment(self, user_id, content, ref_id=0): obj = await Comment.create(github_id=user_id, post_id=self.id, ref_id=ref_id) await obj.set_content(content) - from tasks import mention_users - await mention_users.delay(self.id, content, user_id) + redis = await create_pool(RedisSettings.from_url(REDIS_URL)) + await redis.enqueue_job('mention_users', self.id, content, user_id) return obj async def del_comment(self, user_id, comment_id): diff --git a/models/utils.py b/models/utils.py index 9ed3a0b..3f45111 100644 --- a/models/utils.py +++ b/models/utils.py @@ -1,4 +1,8 @@ import math +from dataclasses import dataclass + +from sqlalchemy.engine.url import _parse_rfc1738_args +from arq.connections import RedisSettings as _RedisSettings def trunc_utf8(string, num, etc='...'): @@ -125,3 +129,11 @@ def iter_pages(self, left_edge=2, left_current=2, yield None yield num last = num + + +@dataclass +class RedisSettings(_RedisSettings): + @classmethod + def from_url(cls, db_url): + url = _parse_rfc1738_args(db_url) + return cls(url.host, url.port, url.database, url.password, 1, 5, 1) diff --git a/requirements.txt b/requirements.txt index 6b43d64..edb8e2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,7 +19,8 @@ sanic-sentry==0.1.7 sanic-jwt==1.2.1 attrs==19.1.0 aiosmtplib==1.0.4 +dataclasses==0.6;python_version == '3.6' contextvars==2.4;python_version == '3.6' -e git+https://github.com/dongweiming/aiomcache.git@dev2#egg=aiomcache -e git+https://github.com/dongweiming/tortoise-orm.git@dev#egg=tortoise --e git+https://github.com/cr0hn/aiotasks.git@42b2395c12e26626d5262d6af1e472b4e5e10509#egg=aiotasks +-e git+https://github.com/samuelcolvin/arq.git@v0.16a4#egg=arq diff --git a/srv/templates/supervisord.conf b/srv/templates/supervisord.conf index 1945222..5573566 100644 --- a/srv/templates/supervisord.conf +++ b/srv/templates/supervisord.conf @@ -9,10 +9,10 @@ startsecs = 5 startretries = 3 redirect_stderr=true -[program:{{ app_name }}_aiotask] -command = /home/{{ ansible_ssh_user }}/{{ app_name }}/venv/bin/aiotasks -vvvv worker -A tasks.py +[program:{{ app_name }}_arq] +command = /home/{{ ansible_ssh_user }}/{{ app_name }}/venv/bin/arq tasks.WorkerSettings directory = /home/{{ ansible_ssh_user }}/{{ app_name }} -stdout_logfile = /home/{{ ansible_ssh_user }}/{{ app_name }}/supervisor_{{ app_name }}_aiotask.log +stdout_logfile = /home/{{ ansible_ssh_user }}/{{ app_name }}/supervisor_{{ app_name }}_arq.log user = {{ ansible_ssh_user }} autostart = true autorestart = true diff --git a/tasks.py b/tasks.py index 7e4f7e9..4f0e3f9 100644 --- a/tasks.py +++ b/tasks.py @@ -6,7 +6,8 @@ import aiosmtplib from mako.template import Template from mako.lookup import TemplateLookup -from aiotasks import build_manager +from arq import create_pool +from arq.connections import RedisSettings from ext import init_db from models.blog import Post @@ -14,7 +15,6 @@ from config import (MAIL_SERVER, MAIL_PORT, MAIL_USERNAME, MAIL_PASSWORD, REDIS_URL, SITE_TITLE, BLOG_URL) -manager = build_manager(REDIS_URL) CAN_SEND = all((MAIL_SERVER, MAIL_USERNAME, MAIL_PASSWORD)) @@ -27,7 +27,6 @@ async def _deco(*args, **kwargs): return _deco -@manager.task() async def send_email(subject, html, send_to): if not CAN_SEND: return @@ -47,9 +46,8 @@ async def send_email(subject, html, send_to): await smtp.quit() -@manager.task() @with_context -async def mention_users(post_id, content, author_id): +async def mention_users(ctx, post_id, content, author_id): post = await Post.cache(post_id) if not post: return @@ -59,9 +57,15 @@ async def mention_users(post_id, content, author_id): if not email: continue subject = EMAIL_SUBJECT.format(title=post.title) - lookup = TemplateLookup(directories=['templates']) + lookup = TemplateLookup(directories=['templates'], + input_encoding='utf-8', + output_encoding='utf-8') template = lookup.get_template('email/mention.html') html = template.render(username=user.username, site_url=BLOG_URL, post=post, site_name=SITE_TITLE) - await send_email.delay(subject, html, email) + await send_email(subject, html.decode(), email) + + +class WorkerSettings: + functions = [mention_users]