Skip to content

Commit

Permalink
Add code and unit tests from pmxbot
Browse files Browse the repository at this point in the history
  • Loading branch information
jaraco committed Sep 10, 2016
1 parent 0792a08 commit 6591698
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 0 deletions.
172 changes: 172 additions & 0 deletions pmxbot/rss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# vim:ts=4:sw=4:noexpandtab
# c-basic-indent: 4; tab-width: 4; indent-tabs-mode: true;

import socket
import re
import datetime
import logging

import feedparser
from jaraco import timing

import pmxbot
from . import core
from . import storage

log = logging.getLogger(__name__)


class FeedHistory(set):
"""
A database-backed set of feed entries that have been seen before.
"""
def __init__(self, db_uri=None):
super().__init__()
self.store = FeedparserDB.from_URI(db_uri)
timer = timing.Stopwatch()
self.update(self.store.get_seen_feeds())
log.info("Loaded feed history in %s", timer.split())
storage.SelectableStorage._finalizers.append(self.__finalize)

def __finalize(self):
del self.store

@classmethod
def get_entry_id(cls, entry, url):
if 'id' in entry:
id = entry['id']
elif 'link' in entry:
id = entry['link']
elif 'title' in entry:
id = entry['title']
else:
raise ValueError("need id, link, or title field")

# Special-case for Google
if 'google.com' in url.lower():
GNEWS_RE = re.compile(r'[?&]url=(.+?)[&$]', re.IGNORECASE)
try:
id = GNEWS_RE.findall(entry['link'])[0]
except Exception:
pass

return id

def add_seen_feed(self, entry, url):
"""
Update the database with the new feedparser entry.
Return True if it was a new feed and was added.
"""
try:
id = self.get_entry_id(entry, url)
except ValueError:
log.exception("Unrecognized entry in feed from %s: %s", url, entry)
return False

if id in self:
return False
self.add(id)
try:
self.store.add_entries([id])
except Exception:
log.exception("Unable to add seen feed")
return False
return True


class RSSFeeds(FeedHistory):
"""
Plugin for feedparser support.
Config values:
- feed_interval: minutes between feed checks
- feeds: list of dicts, each with name, channel, linkurl, and url
"""

def __init__(self):
super().__init__()
self.feed_interval = pmxbot.config.get('feed_interval', 15)
self.feeds = pmxbot.config.get('feeds', [])
for feed in self.feeds:
core.execdelay(
name='feedparser',
channel=feed['channel'],
howlong=datetime.timedelta(minutes=self.feed_interval),
args=[feed],
repeat=True,
)(self.parse_feed)

def parse_feed(self, client, event, feed):
"""
Parse RSS feeds and spit out new articles at
regular intervals in the relevant channels.
"""
socket.setdefaulttimeout(20)
try:
resp = feedparser.parse(feed['url'])
except:
log.exception("Error retrieving feed %s", feed['url'])

outputs = [
self.format_entry(entry)
for entry in resp['entries']
if self.add_seen_feed(entry, feed['url'])
]

if not outputs:
return

txt = 'News from %s %s : %s' % (feed['name'], feed['linkurl'], ' || '.join(outputs[:10]))
yield core.NoLog
yield txt

@staticmethod
def format_entry(entry):
"""
Format the entry suitable for output (add the author if suitable).
"""
needs_author = ' by ' not in entry['title'] and 'author' in entry
template = '{title} by {author}' if needs_author else '{title}'
return template.format(**entry)


class FeedparserDB(storage.SelectableStorage):
pass


class SQLiteFeedparserDB(FeedparserDB, storage.SQLiteStorage):
def init_tables(self):
self.db.execute("CREATE TABLE IF NOT EXISTS feed_seen (key varchar)")
self.db.execute('CREATE INDEX IF NOT EXISTS ix_feed_seen_key ON feed_seen (key)')
self.db.commit()

def get_seen_feeds(self):
return [row[0] for row in self.db.execute('select key from feed_seen')]

def add_entries(self, entries):
self.db.executemany('INSERT INTO feed_seen (key) values (?)', [(x,) for x in entries])
self.db.commit()

def clear(self):
"Clear all entries"
self.db.execute('DELETE FROM feed_seen')

export_all = get_seen_feeds


class MongoDBFeedparserDB(FeedparserDB, storage.MongoDBStorage):
collection_name = 'feed history'

def get_seen_feeds(self):
return [row['key'] for row in self.db.find()]

def add_entries(self, entries):
for entry in entries:
self.db.insert(dict(key=entry))

def import_(self, item):
self.add_entries([item])

def clear(self):
"Clear all entries"
self.db.remove()
74 changes: 74 additions & 0 deletions tests/test_rss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import urllib.parse

import pytest
import feedparser

import pmxbot.rss

sample_feed_entries = [
{'id': '1234'},
{'link': 'http://example.com/2012/12/23'},
{'title': 'A great blog'},
]


@pytest.fixture()
def history(db_uri, request):
history = pmxbot.rss.FeedHistory(db_uri)
request.addfinalizer(history.store.clear)
request.addfinalizer(history._FeedHistory__finalize)
return history


class TestFeedHistory:

@pytest.mark.parametrize('entry', sample_feed_entries)
def test_add_seen_feed(self, history, entry):
"""
Each entry should be added only once, return True when it's added
and return False each subsequent time.
"""
added = history.add_seen_feed(entry, 'http://example.com')
assert added is True
added = history.add_seen_feed(entry, 'http://example.com')
assert added is False
assert len(history) == 1

def test_add_seen_feed_no_identifier(self, history):
"""
If an entry can't be identified, it should log a warning but just
return False.
"""
entry = {'foo': 'bar'}
"an entry with no id/link/title"

assert not history.add_seen_feed(entry, 'http://example.com')

def test_feeds_loaded(self, history):
"""
Feeds saved in one history should be already present when loaded
subsequently in a new history object.
"""
entry = {'id': '1234'}
history.add_seen_feed(entry, 'http://example.com')
assert len(history) == 1

# now create a new history object
orig_uri = history.store.uri
new_history = pmxbot.rss.FeedHistory(orig_uri)
assert len(new_history) == 1
assert new_history.add_seen_feed(entry, 'http://example.com') is False


@pytest.has_internet
def test_format_entry():
site = 'https://github.com'
path = '/yougov/pmxbot/commits/master.atom'
feed_url = urllib.parse.urljoin(site, path)
res = feedparser.parse(feed_url)
entry = res['entries'][0]
pmxbot.rss.RSSFeeds.format_entry(entry)


def test_format_entry_unicode():
pmxbot.rss.RSSFeeds.format_entry(dict(title='\u2013'))

0 comments on commit 6591698

Please sign in to comment.