-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
91 lines (80 loc) · 3.02 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# coding:utf-8
from multiprocessing import Process
from multiprocessing import Queue as PQueue
from Queue import Queue as TQueue
import time
import multiprocessing
from connection import downLoad
from lxml import etree
import urlPattern
from urlExtractor import Extractor
from urlExtractor import Transmit
from urlExtractor import PTTransmit
from urlExtractor import DataExtractor
from mySQLInstance import initDbConnection
URL_QUEUE = PQueue()
LOG_QUEUE = PQueue()
CONTENT_CRAWLER_NUM = 4
CONTENT_CRAWLER_THREAD_NUM = 4
def urlCrawler(urlQueue, logQueue):
rooturl = {
'lawfirm': 'http://www.legalminer.com/search/lawfirm?t=',
'lawyer': 'http://www.legalminer.com/search/lawyer?t=',
'court': 'http://www.legalminer.com/search/court?t=',
'judge': 'http://www.legalminer.com/search/judge?t=',
'corporate':'http://www.legalminer.com/search/corporate?t='
}
rootNum = {}
for key, url in rooturl.items():
content = downLoad(url)
tree = etree.HTML(content)
resultNumNode = tree.xpath(urlPattern.resultNumXpath)
rootNum[key] = urlPattern.extractNum(resultNumNode[0].text)
#每个链接创建一个生产线程提取url
threadQueue = TQueue()
ThreadLst = []
for key, value in rootNum.items():
extractor = Extractor()
extractor.setResultNum(key, int(value), threadQueue)
ThreadLst.append(extractor)
transmit = Transmit()
transmit.setEndNum(len(rootNum), threadQueue, urlQueue, CONTENT_CRAWLER_NUM)
ThreadLst.append(transmit)
for e in ThreadLst:
e.start()
for e in ThreadLst:
e.join()
def contentCrawler(urlQueue, logQueue):
conn = initDbConnection()
ThreadLst = []
threadQueue = TQueue()
pttransmit = PTTransmit()
pttransmit.setParams(urlQueue, threadQueue, CONTENT_CRAWLER_THREAD_NUM)
ThreadLst.append(pttransmit)
for i in range(CONTENT_CRAWLER_THREAD_NUM):
dataExtractor = DataExtractor()
dataExtractor.setParams(threadQueue, conn)
ThreadLst.append(dataExtractor)
for e in ThreadLst:
e.start()
for e in ThreadLst:
e.join()
def indicator(logQueue):
pass
if __name__ == '__main__':
urlCrawlerProcess = Process(target = urlCrawler, args = (URL_QUEUE, LOG_QUEUE), name = 'URL_PROCESS')
urlCrawlerProcess.start()
for i in range(CONTENT_CRAWLER_NUM):
contentCrawlerProcess = Process(target = contentCrawler, args = (URL_QUEUE, LOG_QUEUE), name = 'CONTENT_PROCESS-%d' % i)
contentCrawlerProcess.start()
while(True):
print u'**********************活跃子进程************************************'
for p in multiprocessing.active_children():
print("child p.name:" + p.name + "\tp.id" + str(p.pid))
print u'**********************进程通信队列的长度****************************'
print URL_QUEUE.qsize()
if len( multiprocessing.active_children() ) == 0:
print u'任务结束'
conn.close()
break
time.sleep(60)