-
Notifications
You must be signed in to change notification settings - Fork 308
Rebuild npm syncing #4438
Rebuild npm syncing #4438
Changes from 8 commits
61093d7
1290e12
83cdfda
dcec6cb
bad2311
1cdd131
1837e20
1a0cd7a
631dfc9
af41409
a997d38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
# -*- coding: utf-8 -*- | ||
from __future__ import absolute_import, division, print_function, unicode_literals | ||
|
||
import requests | ||
from aspen import log | ||
from couchdb import Database | ||
|
||
|
||
REGISTRY_URL = 'https://replicate.npmjs.com/' | ||
|
||
|
||
def get_last_seq(db): | ||
return db.one('SELECT npm_last_seq FROM worker_coordination') | ||
|
||
|
||
def production_change_stream(seq): | ||
"""Given a sequence number in the npm registry change stream, start | ||
streaming from there! | ||
""" | ||
return Database(REGISTRY_URL).changes(feed='continuous', include_docs=True, since=seq) | ||
|
||
|
||
def process_doc(doc): | ||
"""Return a smoothed-out doc, or None if it's not a package doc, meaning | ||
there's no name key and it's probably a design doc, per: | ||
|
||
https://github.com/npm/registry/blob/aef8a275/docs/follower.md#clean-up | ||
|
||
""" | ||
if 'name' not in doc: | ||
return None | ||
name = doc['name'] | ||
description = doc.get('description', '') | ||
emails = [e for e in [m.get('email') for m in doc.get('maintainers', [])] if e.strip()] | ||
return {'name': name, 'description': description, 'emails': sorted(set(emails))} | ||
|
||
|
||
def consume_change_stream(change_stream, db): | ||
"""Given a function similar to :py:func:`production_change_stream` and a | ||
:py:class:`~GratipayDB`, read from the stream and write to the db. | ||
|
||
The npm registry is a CouchDB app, which means we get a change stream from | ||
it that allows us to follow registry updates in near-realtime. Our strategy | ||
here is to maintain open connections to both the registry and our own | ||
database, and write as we read. | ||
|
||
""" | ||
last_seq = get_last_seq(db) | ||
log("Picking up with npm sync at {}.".format(last_seq)) | ||
with db.get_connection() as connection: | ||
for change in change_stream(last_seq): | ||
if change.get('deleted'): | ||
# Hack to work around conflation of design docs and packages in updates | ||
op, doc = delete, {'name': change['id']} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a bit confusing. At the cost of a line or two more, I think this can be simplified. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something along the lines of: Raw version: Before: with db.get_connection() as connection:
for change in change_stream(last_seq):
if change.get('deleted'):
# Hack to work around conflation of design docs and packages in updates
op, doc = delete, {'name': change['id']}
else:
op, doc = upsert, change['doc']
processed = process_doc(doc)
if not processed:
continue
cursor = connection.cursor()
op(cursor, processed)
cursor.run('UPDATE worker_coordination SET npm_last_seq=%(seq)s', change)
connection.commit()
def delete(cursor, processed):
cursor.run("DELETE FROM packages WHERE package_manager='npm' AND name=%(name)s", processed)
def upsert(cursor, processed):
cursor.run('''
INSERT INTO packages
(package_manager, name, description, emails)
VALUES ('npm', %(name)s, %(description)s, %(emails)s)
ON CONFLICT (package_manager, name) DO UPDATE
SET description=%(description)s, emails=%(emails)s
''', processed) After: with db.get_connection() as connection:
for change in change_stream(last_seq):
cursor = connection.cursor()
if change.get('deleted'):
# Hack to work around conflation of design docs and packages in updates
delete(cursor, change['id'])
else:
upsert(cursor, process_doc(doc))
cursor.run('UPDATE worker_coordination SET npm_last_seq=%(seq)s', change)
connection.commit()
def delete(cursor, package_name):
cursor.run("DELETE FROM packages WHERE package_manager='npm' AND name=%s", package_name)
def upsert(cursor, processed_doc):
cursor.run('''
INSERT INTO packages
(package_manager, name, description, emails)
VALUES ('npm', %(name)s, %(description)s, %(emails)s)
ON CONFLICT (package_manager, name) DO UPDATE
SET description=%(description)s, emails=%(emails)s
''', processed_doc) Only downside I see here is that we're doing a little bit more work (calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That doesn't account for skipping docs with no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes 631dfc9 looks good |
||
else: | ||
op, doc = upsert, change['doc'] | ||
processed = process_doc(doc) | ||
if not processed: | ||
continue | ||
cursor = connection.cursor() | ||
op(cursor, processed) | ||
cursor.run('UPDATE worker_coordination SET npm_last_seq=%(seq)s', change) | ||
connection.commit() | ||
|
||
|
||
def delete(cursor, processed): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be clearer if we renamed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 631dfc9. |
||
cursor.run("DELETE FROM packages WHERE package_manager='npm' AND name=%(name)s", processed) | ||
|
||
|
||
def upsert(cursor, processed): | ||
cursor.run(''' | ||
INSERT INTO packages | ||
(package_manager, name, description, emails) | ||
VALUES ('npm', %(name)s, %(description)s, %(emails)s) | ||
|
||
ON CONFLICT (package_manager, name) DO UPDATE | ||
SET description=%(description)s, emails=%(emails)s | ||
''', processed) | ||
|
||
|
||
def check(db, _print=print): | ||
ours = db.one('SELECT npm_last_seq FROM worker_coordination') | ||
theirs = int(requests.get(REGISTRY_URL).json()['update_seq']) | ||
_print("count#npm-sync-lag={}".format(theirs - ours)) |
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if we're calling
get_last_seq
here anyway - might make sense to simplify the function definition ofconsume_change_stream
to accept the stream directly, and not a function that has to be called withseq
to return the stream?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in af41409.