diff --git a/scrapyd-deploy.py b/scrapyd-deploy.py new file mode 100644 index 0000000..09c611c --- /dev/null +++ b/scrapyd-deploy.py @@ -0,0 +1,294 @@ +#!/usr/bin/python2 +# coding=utf-8 + +import glob +import json +import netrc +import os +import shutil +import sys +import tempfile +import time +from optparse import OptionParser +from subprocess import Popen, PIPE, check_call + +import setuptools # not used in code but needed in runtime, don't remove! +from scrapy.utils.conf import get_config, closest_scrapy_cfg +from scrapy.utils.http import basic_auth_header +from scrapy.utils.project import inside_project +from scrapy.utils.python import retry_on_eintr +from six.moves.urllib.error import HTTPError, URLError +from six.moves.urllib.parse import urlparse, urljoin +from six.moves.urllib.request import (build_opener, install_opener, + HTTPRedirectHandler as UrllibHTTPRedirectHandler, + Request, urlopen) +from w3lib.form import encode_multipart + +_SETUP_PY_TEMPLATE = \ +"""# Automatically created by: scrapyd-deploy + +from setuptools import setup, find_packages + +setup( + name = 'project', + version = '1.0', + packages = find_packages(), + entry_points = {'scrapy': ['settings = %(settings)s']}, +) +""" + +def parse_opts(): + parser = OptionParser(usage="%prog [options] [ [target] | -l | -L ]", + description="Deploy Scrapy project to Scrapyd server") + parser.add_option("-p", "--project", + help="the project name in the target") + parser.add_option("-v", "--version", + help="the version to deploy. Defaults to current timestamp") + parser.add_option("-l", "--list-targets", action="store_true", \ + help="list available targets") + parser.add_option("-a", "--deploy-all-targets",action="store_true", help="deploy all targets") + parser.add_option("-d", "--debug", action="store_true", + help="debug mode (do not remove build dir)") + parser.add_option("-L", "--list-projects", metavar="TARGET", \ + help="list available projects on TARGET") + parser.add_option("--egg", metavar="FILE", + help="use the given egg, instead of building it") + parser.add_option("--build-egg", metavar="FILE", + help="only build the egg, don't deploy it") + return parser.parse_args() + +def main(): + opts, args = parse_opts() + exitcode = 0 + if not inside_project(): + _log("Error: no Scrapy project found in this location") + sys.exit(1) + + install_opener( + build_opener(HTTPRedirectHandler) + ) + + if opts.list_targets: + for name, target in _get_targets().items(): + print("%-20s %s" % (name, target['url'])) + return + + if opts.list_projects: + target = _get_target(opts.list_projects) + req = Request(_url(target, 'listprojects.json')) + _add_auth_header(req, target) + f = urlopen(req) + projects = json.loads(f.read())['projects'] + print(os.linesep.join(projects)) + return + + tmpdir = None + + if opts.build_egg: # build egg only + egg, tmpdir = _build_egg() + _log("Writing egg to %s" % opts.build_egg) + shutil.copyfile(egg, opts.build_egg) + elif opts.deploy_all_targets: + version = None + for name, target in _get_targets().items(): + if version is None: + version = _get_version(target, opts) + _build_egg_and_deploy_target(target, version, opts) + else: # buld egg and deploy + target_name = _get_target_name(args) + target = _get_target(target_name) + version = _get_version(target, opts) + exitcode, tmpdir = _build_egg_and_deploy_target(target, version, opts) + + if tmpdir: + if opts.debug: + _log("Output dir not removed: %s" % tmpdir) + else: + shutil.rmtree(tmpdir) + + sys.exit(exitcode) + +def _build_egg_and_deploy_target(target, version, opts): + exitcode = 0 + tmpdir = None + + project = _get_project(target, opts) + if opts.egg: + _log("Using egg: %s" % opts.egg) + egg = opts.egg + else: + _log("Packing version %s" % version) + egg, tmpdir = _build_egg() + if not _upload_egg(target, egg, project, version): + exitcode = 1 + return exitcode, tmpdir + +def _log(message): + sys.stderr.write(message + os.linesep) + +def _fail(message, code=1): + _log(message) + sys.exit(code) + +def _get_target_name(args): + if len(args) > 1: + raise _fail("Error: Too many arguments: %s" % ' '.join(args)) + elif args: + return args[0] + elif len(args) < 1: + return 'default' + +def _get_project(target, opts): + project = opts.project or target.get('project') + if not project: + raise _fail("Error: Missing project") + return project + +def _get_option(section, option, default=None): + cfg = get_config() + return cfg.get(section, option) if cfg.has_option(section, option) \ + else default + +def _get_targets(): + cfg = get_config() + baset = dict(cfg.items('deploy')) if cfg.has_section('deploy') else {} + targets = {} + if 'url' in baset: + targets['default'] = baset + for x in cfg.sections(): + if x.startswith('deploy:'): + t = baset.copy() + t.update(cfg.items(x)) + targets[x[7:]] = t + return targets + +def _get_target(name): + try: + return _get_targets()[name] + except KeyError: + raise _fail("Unknown target: %s" % name) + +def _url(target, action): + return urljoin(target['url'], action) + +def _get_version(target, opts): + version = opts.version or target.get('version') + if version == 'HG': + p = Popen(['hg', 'tip', '--template', '{rev}'], stdout=PIPE, universal_newlines=True) + d = 'r%s' % p.communicate()[0] + p = Popen(['hg', 'branch'], stdout=PIPE, universal_newlines=True) + b = p.communicate()[0].strip('\n') + return '%s-%s' % (d, b) + elif version == 'GIT': + p = Popen(['git', 'describe'], stdout=PIPE, universal_newlines=True) + d = p.communicate()[0].strip('\n') + if p.wait() != 0: + p = Popen(['git', 'rev-list', '--count', 'HEAD'], stdout=PIPE, universal_newlines=True) + d = 'r%s' % p.communicate()[0].strip('\n') + + p = Popen(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], stdout=PIPE, universal_newlines=True) + b = p.communicate()[0].strip('\n') + return '%s-%s' % (d, b) + elif version: + return version + else: + # 这里要转成分钟级别的,防止不同机器版本号不一样 + return str(int(time.time()) / 60 * 60) + "_" + time.strftime("%Y-%m-%dT%H:%M:00", time.localtime()) + # return str(int(time.time())) + +def _upload_egg(target, eggpath, project, version): + with open(eggpath, 'rb') as f: + eggdata = f.read() + data = { + 'project': project, + 'version': version, + 'egg': ('project.egg', eggdata), + } + body, boundary = encode_multipart(data) + url = _url(target, 'addversion.json') + headers = { + 'Content-Type': 'multipart/form-data; boundary=%s' % boundary, + 'Content-Length': str(len(body)), + } + req = Request(url, body, headers) + _add_auth_header(req, target) + _log('Deploying to project "%s" in %s' % (project, url)) + return _http_post(req) + +def _add_auth_header(request, target): + if 'username' in target: + u, p = target.get('username'), target.get('password', '') + request.add_header('Authorization', basic_auth_header(u, p)) + else: # try netrc + try: + host = urlparse(target['url']).hostname + a = netrc.netrc().authenticators(host) + request.add_header('Authorization', basic_auth_header(a[0], a[2])) + except (netrc.NetrcParseError, IOError, TypeError): + pass + +def _http_post(request): + try: + f = urlopen(request) + _log("Server response (%s):" % f.code) + print(f.read().decode('utf-8')) + return True + except HTTPError as e: + _log("Deploy failed (%s):" % e.code) + resp = e.read().decode('utf-8') + try: + d = json.loads(resp) + except ValueError: + print(resp) + else: + if "status" in d and "message" in d: + print("Status: %(status)s" % d) + print("Message:\n%(message)s" % d) + else: + print(json.dumps(d, indent=3)) + except URLError as e: + _log("Deploy failed: %s" % e) + +def _build_egg(): + closest = closest_scrapy_cfg() + os.chdir(os.path.dirname(closest)) + if not os.path.exists('setup.py'): + settings = get_config().get('settings', 'default') + _create_default_setup_py(settings=settings) + d = tempfile.mkdtemp(prefix="scrapydeploy-") + o = open(os.path.join(d, "stdout"), "wb") + e = open(os.path.join(d, "stderr"), "wb") + retry_on_eintr(check_call, [sys.executable, 'setup.py', 'clean', '-a', 'bdist_egg', '-d', d], stdout=o, stderr=e) + o.close() + e.close() + egg = glob.glob(os.path.join(d, '*.egg'))[0] + return egg, d + +def _create_default_setup_py(**kwargs): + with open('setup.py', 'w') as f: + f.write(_SETUP_PY_TEMPLATE % kwargs) + + +class HTTPRedirectHandler(UrllibHTTPRedirectHandler): + + def redirect_request(self, req, fp, code, msg, headers, newurl): + newurl = newurl.replace(' ', '%20') + if code in (301, 307): + return Request(newurl, + data=req.get_data(), + headers=req.headers, + origin_req_host=req.get_origin_req_host(), + unverifiable=True) + elif code in (302, 303): + newheaders = dict((k, v) for k, v in req.headers.items() + if k.lower() not in ("content-length", "content-type")) + return Request(newurl, + headers=newheaders, + origin_req_host=req.get_origin_req_host(), + unverifiable=True) + else: + raise HTTPError(req.get_full_url(), code, msg, headers, fp) + + +if __name__ == "__main__": + main() diff --git a/scrapydweb/run.py b/scrapydweb/run.py index cdb9c2b..72d21cc 100644 --- a/scrapydweb/run.py +++ b/scrapydweb/run.py @@ -14,6 +14,8 @@ from scrapydweb.vars import ROOT_DIR, SCRAPYDWEB_SETTINGS_PY, SCHEDULER_STATE_DICT, STATE_PAUSED, STATE_RUNNING from scrapydweb.utils.check_app_config import check_app_config +reload(sys) +sys.setdefaultencoding('utf8') logger = logging.getLogger(__name__) apscheduler_logger = logging.getLogger('apscheduler') diff --git a/scrapydweb/views/overview/tasks.py b/scrapydweb/views/overview/tasks.py index c934590..52a8d86 100644 --- a/scrapydweb/views/overview/tasks.py +++ b/scrapydweb/views/overview/tasks.py @@ -1,16 +1,16 @@ # coding: utf-8 -from datetime import datetime import json import logging import traceback +from datetime import datetime from flask import Blueprint, flash, render_template, request, send_file, url_for +from sqlalchemy import func +from ..baseview import BaseView from ...common import handle_metadata from ...models import Task, TaskResult, TaskJobResult, db from ...vars import SCHEDULER_STATE_DICT, STATE_PAUSED, STATE_RUNNING, TIMER_TASKS_HISTORY_LOG -from ..baseview import BaseView - apscheduler_logger = logging.getLogger('apscheduler') metadata = dict(per_page=handle_metadata().get('tasks_per_page', 100)) @@ -120,33 +120,51 @@ def remove_apscheduler_job_without_task(self): def process_tasks(self, tasks): with db.session.no_autoflush: # To avoid in place updating + # for task in tasks: # TypeError: 'Pagination' object is not iterable # tasks.item: list + task_id_list = map(lambda inner_task: inner_task.id, tasks.items) + if len(task_id_list) > 0: + task_result_list = db.session.query(TaskResult.id, TaskResult.task_id, func.sum(TaskResult.fail_count), + func.sum(TaskResult.pass_count), + func.count(TaskResult.id)).order_by(TaskResult.id.desc()).group_by( + TaskResult.task_id).filter(TaskResult.task_id.in_(task_id_list)).all() + else: + task_result_list = [] + # for task in tasks: # TypeError: 'Pagination' object is not iterable # tasks.item: list for index, task in enumerate(tasks.items, (tasks.page - 1) * tasks.per_page + 1): # Columns: Name | Prev run result | Task results + # task_result_list: (id, task_id, fail_count, pass_count, total_count) + task_results = filter(lambda inner_result: inner_result[1] == task.id, + task_result_list) # type: list + if task_results: + task_result = {'id': task_results[0][0], 'task_id': task_results[0][1], + 'fail_count': task_results[0][2], 'pass_count': task_results[0][3], + 'total_count': task_results[0][4]} + else: + task_result = None task.index = index task.name = task.name or '' task.timezone = task.timezone or self.scheduler.timezone task.create_time = self.remove_microsecond(task.create_time) task.update_time = self.remove_microsecond(task.update_time) - task_results = TaskResult.query.filter_by(task_id=task.id).order_by(TaskResult.id.desc()) - task.run_times = task_results.count() + task.run_times = task_result['total_count'] if task_result else 0 task.url_task_results = url_for('tasks', node=self.node, task_id=task.id) if task.run_times > 0: - task.fail_times = sum([int(t.fail_count > 0) for t in task_results]) - latest_task_result = task_results[0] - if latest_task_result.fail_count == 0 and latest_task_result.pass_count == 1: - task_job_result = TaskJobResult.query.filter_by(task_result_id=latest_task_result.id).order_by( - TaskJobResult.id.desc()).first() - task.prev_run_result = task_job_result.result[-19:] # task_N_2019-01-01T00_00_01 - task.url_prev_run_result = url_for('log', node=task_job_result.node, opt='stats', - project=task.project, spider=task.spider, - job=task_job_result.result) - else: - # 'FAIL 0, PASS 0' if execute_task() has not finished - task.prev_run_result = 'FAIL %s, PASS %s' % (latest_task_result.fail_count, - latest_task_result.pass_count) - task.url_prev_run_result = url_for('tasks', node=self.node, - task_id=task.id, task_result_id=latest_task_result.id) + task.fail_times = task_result['fail_count'] if task_result else 0 + if task_result: + if task_result['fail_count'] == 0 and task_result['pass_count'] == 1: + task_job_result = TaskJobResult.query.filter_by(task_result_id=task_result['id']).order_by( + TaskJobResult.id.desc()).first() + task.prev_run_result = task_job_result.result[-19:] # task_N_2019-01-01T00_00_01 + task.url_prev_run_result = url_for('log', node=task_job_result.node, opt='stats', + project=task.project, spider=task.spider, + job=task_job_result.result) + else: + # 'FAIL 0, PASS 0' if execute_task() has not finished + task.prev_run_result = 'FAIL %s, PASS %s' % (task_result['fail_count'], + task_result['pass_count']) + task.url_prev_run_result = url_for('tasks', node=self.node, + task_id=task.id, task_result_id=task_result['id']) else: task.fail_times = 0 task.prev_run_result = self.NA