-
Notifications
You must be signed in to change notification settings - Fork 0
/
rss_scrapers.py
121 lines (100 loc) · 3.64 KB
/
rss_scrapers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
Type: Job
Developer: Vignesh
Description: Parallel consumers scraping contents from URL and send it to kafka topic
"""
### scraper
import feedparser
from bs4 import BeautifulSoup as bs
import requests
###
import json
import pandas as pd
###
import datetime
from dateutil.parser import parse
from time import mktime
### multithreading
import threading
### kafka
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka import TopicPartition
### mongodb
from pymongo import MongoClient
from bson.objectid import ObjectId
### custom module
from rss_parser import *
with open('./config.json') as f:
config = json.load(f)
### mongodb connection
mg_host = config['mongodb']['host']
mg_port = config['mongodb']['port']
mg_client = MongoClient(host=mg_host, port=mg_port)
### error log db & collection
error_log_db = config['rss_scrapers']['error_log_db']
error_log_collection = config['rss_scrapers']['error_log_collection']
### rss_links db & collection
rss_links_db = config['rss_scrapers']['rss_links_db']
rss_links_collection = config['rss_scrapers']['rss_links_collection']
### kafka connection info
bootstrap_servers = config['kafka']['bootstrap_servers']
consumer_topic = config['rss_scrapers']['consumer_topic']
group_id = config['rss_scrapers']['group_id']
threads = config['rss_scrapers']['threads']
producer_topic = config['rss_scrapers']['producer_topic']
### kafka producer object
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
### generating consumer objects
def get_consumer(host, topic, group_id):
consumer = KafkaConsumer(bootstrap_servers=host, group_id=group_id)
consumer.subscribe([topic])
return consumer
consumers = {}
parsers = {}
for i in range(threads):
consumers[i] = get_consumer(host=bootstrap_servers, topic=consumer_topic, group_id=group_id)
parsers[i] = RSS_Parser()
### parser function; thread function
def rss_parser(consumer, parser):
for msg in consumer:
#print(msg)
#continue
record = json.loads(msg.value)
last_update = record['last_update']
last_update = parse(last_update)
last_update = last_update.astimezone(datetime.timezone.utc)
news_record, latest_update = parser.parse(record['url'], '', last_update, record['pattern'])
if news_record:
if news_record[0] == 'Reject':
log = {
"type": "Reject",
"vendor": record['vendor'],
"url": record['url']
}
mg_client[error_log_db][error_log_collection].insert_one(log)
print(log)
elif news_record[0] == 'Exception':
log = {
"type": "Exception",
"vendor": record['vendor'],
"url": record['url'],
"error_msg": str(news_record[1])
}
mg_client[error_log_db][error_log_collection].insert_one(log)
print(log)
else:
for news in news_record:
print(news)
news['published'] = str(news['published'])
news['vendor'] = record['vendor']
producer.send(producer_topic, bytes(json.dumps(news), 'utf-8'))
mg_client[rss_links_db][rss_links_collection].update_one({"_id": ObjectId(record["_id"])}, {"$set": {"last_update": str(latest_update)}})
### starting threads
jobs = {}
for i in range(threads):
jobs[i] = threading.Thread(target=rss_parser, args=(consumers[i], parsers[i],))
jobs[i].start()
print("Thread: "+str(i)+" Started")
for i in range(threads):
jobs[i].join()