Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll jobs by comparing both priority and insert_time #344

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions scrapyd/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,35 @@ def __init__(self, config):
self.config = config
self.update_projects()
self.dq = DeferredQueue()
# For backward compatibility with custom SqliteSpiderQueue and JsonSqlitePriorityQueue
# TODO: remove it and add method get_project_with_highest_priority in ISpiderQueue in 1.4
self.support_comparing_priorities = None

@inlineCallbacks
def poll(self):
if not self.dq.waiting:
return
for p, q in iteritems(self.queues):
c = yield maybeDeferred(q.count)
if c:

if self.support_comparing_priorities is None:
self.test_comparing_priorities()

project_with_highest_priority = None
if self.support_comparing_priorities:
for p, q in iteritems(self.queues):
project_with_highest_priority = q.get_project_with_highest_priority()
break
if project_with_highest_priority:
q = self.queues[project_with_highest_priority]
msg = yield maybeDeferred(q.pop)
if msg is not None: # In case of a concurrently accessed queue
returnValue(self.dq.put(self._message(msg, p)))
returnValue(self.dq.put(self._message(msg, project_with_highest_priority)))
if not self.support_comparing_priorities or not project_with_highest_priority:
for p, q in iteritems(self.queues):
c = yield maybeDeferred(q.count)
if c:
msg = yield maybeDeferred(q.pop)
if msg is not None: # In case of a concurrently accessed queue
returnValue(self.dq.put(self._message(msg, p)))

def next(self):
return self.dq.get()
Expand All @@ -35,3 +53,14 @@ def _message(self, queue_msg, project):
d['_project'] = project
d['_spider'] = d.pop('name')
return d

def test_comparing_priorities(self):
for p, q in iteritems(self.queues):
try:
getattr(q, 'get_project_with_highest_priority')
getattr(q.q, 'project_priority_map')
except AttributeError:
self.support_comparing_priorities = False
else:
self.support_comparing_priorities = True
return
9 changes: 8 additions & 1 deletion scrapyd/spiderqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
@implementer(ISpiderQueue)
class SqliteSpiderQueue(object):

def __init__(self, database=None, table='spider_queue'):
def __init__(self, database=None, table='spider_queue_with_triggers'):
self.q = JsonSqlitePriorityQueue(database, table)

def add(self, name, priority=0.0, **spider_args):
Expand All @@ -29,3 +29,10 @@ def remove(self, func):

def clear(self):
self.q.clear()

def get_project_with_highest_priority(self):
if self.q.project_priority_map:
return sorted(self.q.project_priority_map,
key=lambda x: self.q.project_priority_map[x], reverse=True)[0]
else:
return None
34 changes: 32 additions & 2 deletions scrapyd/sqlite.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sqlite3
import json
import os
try:
from collections.abc import MutableMapping
except ImportError:
Expand Down Expand Up @@ -82,19 +83,28 @@ class JsonSqlitePriorityQueue(object):
"""SQLite priority queue. It relies on SQLite concurrency support for
providing atomic inter-process operations.
"""
project_priority_map = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is like "master table" solution I was talking about
actually implemented as a singleton to share state between all instances of the queue class
and save us lot of io and cpu cycles,
right?

Copy link
Collaborator Author

@my8100 my8100 Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fast to find out the queue to pop in poll()
since project_priority_map is a dict like:
{'project1': (0.0, -1561646348), 'project2': (1.0, -1561646349)}

So, there's no need to introduce an actual "master table".


def __init__(self, database=None, table="queue"):
self.database = database or ':memory:'
self.table = table
if database:
dbname = os.path.split(database)[-1]
self.project = os.path.splitext(dbname)[0]
else:
self.project = self.database
# about check_same_thread: http://twistedmatrix.com/trac/ticket/4040
self.conn = sqlite3.connect(self.database, check_same_thread=False)
q = "create table if not exists %s (id integer primary key, " \
"priority real key, message blob)" % table
"priority real key, message blob, insert_time TIMESTAMP)" % table
self.conn.execute(q)
self.create_triggers()
self.update_project_priority_map()

def put(self, message, priority=0.0):
args = (priority, self.encode(message))
q = "insert into %s (priority, message) values (?,?)" % self.table
q = "insert into %s (priority, message, insert_time) values (?,?, CURRENT_TIMESTAMP)" \
% self.table
self.conn.execute(q, args)
self.conn.commit()

Expand Down Expand Up @@ -131,6 +141,26 @@ def clear(self):
self.conn.execute("delete from %s" % self.table)
self.conn.commit()

def create_triggers(self):
self.conn.create_function("update_project_priority_map", 0, self.update_project_priority_map)
for action in ['INSERT', 'UPDATE', 'DELETE']:
name = 'trigger_on_%s' % action.lower()
self.conn.execute("""
CREATE TRIGGER IF NOT EXISTS %s AFTER %s ON %s
BEGIN
SELECT update_project_priority_map();
END;
""" % (name, action, self.table))

def update_project_priority_map(self):
q = "select priority, strftime('%%s', insert_time) from %s order by priority desc limit 1" \
% self.table
result = self.conn.execute(q).fetchone()
if result is None:
self.project_priority_map.pop(self.project, None)
else:
self.project_priority_map[self.project] = (result[0], -int(result[-1]))

def __len__(self):
q = "select count(*) from %s" % self.table
return self.conn.execute(q).fetchone()[0]
Expand Down
39 changes: 25 additions & 14 deletions scrapyd/tests/test_poller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time

from twisted.trial import unittest
from twisted.internet.defer import Deferred
Expand Down Expand Up @@ -28,30 +29,40 @@ def test_interface(self):
verifyObject(IPoller, self.poller)

def test_poll_next(self):
cfg = {'mybot1': 'spider1',
'mybot2': 'spider2'}
priority = 0
for prj, spd in cfg.items():
cfg = [('mybot2', 'spider2', 0), # second
('mybot1', 'spider2', 0.0), # third
('mybot1', 'spider1', -1), # fourth
('mybot1', 'spider3', 1.0)] # first
for prj, spd, priority in cfg:
self.queues[prj].add(spd, priority)
if prj == 'mybot2':
time.sleep(1.5) # ensure different timestamp

d1 = self.poller.next()
d2 = self.poller.next()
d3 = self.poller.next()
d4 = self.poller.next()
d5 = self.poller.next()
self.failUnless(isinstance(d1, Deferred))
self.failIf(hasattr(d1, 'result'))

# poll once
# first poll
self.poller.poll()
self.failUnless(hasattr(d1, 'result') and getattr(d1, 'called', False))
self.assertEqual(d1.result, {'_project': 'mybot1', '_spider': 'spider3'})

# which project got run: project1 or project2?
self.failUnless(d1.result.get('_project'))
prj = d1.result['_project']
self.failUnlessEqual(d1.result['_spider'], cfg.pop(prj))
# second poll
self.poller.poll()
self.assertEqual(d2.result, {'_project': 'mybot2', '_spider': 'spider2'})

# third poll
self.poller.poll()
self.assertEqual(d3.result, {'_project': 'mybot1', '_spider': 'spider2'})

self.queues[prj].pop()
# fourth poll
self.poller.poll()
self.assertEqual(d4.result, {'_project': 'mybot1', '_spider': 'spider1'})

# poll twice
# check that the other project's spider got to run
# final poll
self.poller.poll()
prj, spd = cfg.popitem()
self.failUnlessEqual(d2.result, {'_project': prj, '_spider': spd})
self.failIf(hasattr(d5, 'result'))