-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcrawler.py
106 lines (80 loc) · 2.77 KB
/
crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from queue import Queue
import sys
from threading import Event, Lock, Thread
from urllib.parse import urlparse
from urllib.request import urlopen
from a_parser import AParser
class Crawler:
def __init__(self, uri, workers=4):
pu = urlparse(uri)
self.__uris = {}
self.scheme = pu.scheme
self.netloc = pu.netloc
self.netloc_ascii = pu.netloc.encode('idna').decode('ascii')
self.root = '%s://%s' % (self.scheme, self.netloc_ascii)
self.start_uri = '%s%s' % (self.root, pu.path or '/')
self.queue = Queue()
self.lock = Lock()
self.run_event = Event()
self.threads = [
Thread(target=self.__worker, daemon=True) for _ in range(workers)
]
def crawl(self):
"""Start crawl process"""
self.__schedule(self.start_uri)
self.run_event.set()
try:
for t in self.threads:
t.start()
self.queue.join()
except KeyboardInterrupt:
self.queue.queue.clear()
self.run_event.clear()
for t in self.threads:
t.join()
print('Interrupted.', file=sys.stderr)
raise
@property
def uris(self):
# TODO: deny mutations
return self.__uris
def has(self, uri):
"""Check if url has in crawled urlset"""
with self.lock:
return uri in self.__uris
def is_our(self, uri):
return uri.startswith(self.start_uri) or uri.startswith('/')
def normalize(self, uri):
if uri.startswith('//'):
uri = '%s:%s' % (self.scheme, uri)
elif uri.startswith('/'):
uri = '%s%s' % (self.root, uri)
# TODO: deal with ./target/path and target/path
return uri
def __schedule_crawl(self, html):
parser = AParser()
parser.feed(html)
unique_uris = set(parser.hrefs)
our_unique_uris = filter(self.is_our, unique_uris)
for new_uri in map(self.normalize, our_unique_uris):
if not self.has(new_uri):
self.__schedule(new_uri)
def __worker(self):
while self.run_event.is_set():
uri = self.queue.get()
try:
self.__process(uri)
except Exception as e:
print('[E]', uri, repr(e), file=sys.stderr)
self.queue.task_done()
def __process(self, uri):
with urlopen(uri) as response:
self.__add(uri, response.headers.get('Last-Modified'))
print(uri)
self.__schedule_crawl(response.read().decode())
def __add(self, uri, data = ''):
with self.lock:
self.__uris[uri] = data
def __schedule(self, uri):
self.__add(uri)
self.queue.put_nowait(uri)