-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemail_scraper.py
163 lines (128 loc) · 5.79 KB
/
email_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from io import BytesIO
import requests
import re
from itertools import islice
from typing import NamedTuple
from datetime import datetime
import dateutil
from html2text import HTML2Text
from dateutil.parser import parse as parse_date
from playwright.async_api import async_playwright
def login(password: str, list_name: str = "hacksu") -> requests.Session:
"This function creates a login session that can be used for later requests"
session = requests.Session()
login_url = f"https://listmail.cs.kent.edu/mailman/admin/{list_name}"
session.post(login_url, { "adminpw": password }).raise_for_status()
return session
def get_recent_email_urls(session: requests.Session, list_name: str = "hacksu") -> list[str]:
"""
This function retrieves the URLs corresponding to the emails that have
arrived in the previous two calendar months.
It uses the most recent two calendar months instead of just the most recent
calendar month to avoid missing e.g. an email that arrives in the last
second of January when performing the first check in February.
"""
base_url = f"https://listmail.cs.kent.edu/mailman/private/{list_name}/"
# find the (relative) urls for the "threads" (each thread is a month of emails)
archives_request = session.get(base_url)
threads = list(re.finditer(r'href="(.+?/)thread.html"', archives_request.text, re.IGNORECASE))
emails = []
# find the (relative) urls for the emails in the first two threads and
# concatenate them with the list's base url and the thread's base url
for thread in threads[:2]:
thread_request = session.get(base_url + thread.group(1))
emails += [
(base_url + thread.group(1) + match.group(1))
for match in re.finditer(r'href="(\d+.html)"', thread_request.text, re.IGNORECASE)
]
return emails
def email_url_to_id(url: str) -> int:
return int(re.search(r"(\d+).html", url).group(1))
def get_last_email_url(emails: list[str]) -> str:
"Returns the URL of the email with the highest numerical ID."
assert len(emails) > 0, "can't get last email of 0 emails"
return sorted(emails, key=email_url_to_id)[-1]
def deduplicate_string(string: str) -> str:
"""
When there's no named sender for an email, the mailing list interface just
shows the "from" address twice, which is annoying. This function will make
sure a string isn't the same thing twice with a space in the middle.
"""
if len(string) % 2 == 1 and string[:len(string)//2] == string[len(string)//2+1:]:
return string[:len(string)//2]
return string
EmailMetadata = NamedTuple(
"EmailMetadata",
[("subject", str), ("from_address", str), ("timestamp", datetime), ("has_html", bool)]
)
def get_email_metadata(session: requests.Session, email_url: str) -> EmailMetadata:
"""
This attempts to parse the somewhat messy HTML on the email page and
returns basic information about the email.
"""
email = session.get(email_url).text
# convert email page to formatted text
parser = HTML2Text()
parser.ignore_links = True
parser.ignore_mailto_links = True
parser.body_width = 1000000 # keep it from wrapping long lines
text = parser.handle(email)
# once we have the email page as formatted text, the subject, "from"
# address, and timestamp are on the first three lines of the result
meta = list(
islice(
# we pass two arguments to islice:
# a generator expression that yields stripped non-empty lines
(l.replace("[Hacksu]", "").strip("#\n _")
for l in text.splitlines() if len(l.strip())),
# the number of elements to retrieve from the generator expression
3
)
)
return EmailMetadata(
subject = meta[0],
from_address = deduplicate_string(meta[1]),
timestamp = parse_date(
timestr = meta[2],
# need to tell it what "EDT" and "EST" mean since they are used in
# the date and time on the Mailman page
tzinfos = {
"EDT": dateutil.tz.gettz("US/Eastern"),
"EST": dateutil.tz.gettz("US/Eastern")
}
),
has_html = get_email_html(session, email_url) is not None
)
def get_email_html(session: requests.Session, email_url: str) -> str | None:
"""
This grabs the HTML "attachment" from Mailman that contains the actual
rich text content of the email.
"""
email = session.get(email_url).text
# this is where my valiant attempt to avoid hardcoding too much information,
# like the name of the mailing list, in the url patterns breaks down
html_url_pattern = r"https://listmail.cs.kent.edu/mailman/private/hacksu/attachments/\d+/\w+/attachment(?:-0001)?.htm"
match = re.search(html_url_pattern, email)
if match:
html_email_page = session.get(match.group(0)).text
html_email_body = re.search(r"<tt>(.*)</tt>", html_email_page, flags=re.DOTALL).group(1)
return HTML2Text().handle(html_email_body)
else:
return None
async def get_email_image(session: requests.Session, email_url: str) -> BytesIO:
html = get_email_html(session, email_url)
if html is None:
html = session.get(email_url).text
async with async_playwright() as p:
browser = await p.firefox.launch()
page = await browser.new_page()
await page.set_viewport_size({"width": 800, "height": 800})
await page.set_content(html)
height = min(
1200,
await page.evaluate('document.documentElement.offsetHeight')
)
await page.set_viewport_size({"width": 800, "height": height})
screenshot = await page.screenshot()
await browser.close()
return BytesIO(screenshot)