-
Notifications
You must be signed in to change notification settings - Fork 5
/
twitter_stream_exporter.py
55 lines (42 loc) · 2.76 KB
/
twitter_stream_exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from sfmutils.exporter import BaseExporter
from twitter_stream_warc_iter import TwitterStreamWarcIter, TwitterStreamWarcIter2
from twitter_rest_exporter import BaseTwitterStatusTable, BaseTwitterTwoStatusTable, TwitterRestExporter2
import argparse
import sys
import logging
log = logging.getLogger(__name__)
QUEUE = "twitter_stream_exporter"
QUEUE2 = "twitter_stream_exporter2"
FILTER_ROUTING_KEY = "export.start.twitter.twitter_filter"
SAMPLE_ROUTING_KEY = "export.start.twitter.twitter_sample"
FILTER_STREAM_ROUTING_KEY = "export.start.twitter2.twitter_filter_stream"
class TwitterStreamStatusTable(BaseTwitterStatusTable):
def __init__(self, warc_paths, dedupe, item_date_start, item_date_end, seed_uids, segment_row_size=None):
BaseTwitterStatusTable.__init__(self, warc_paths, dedupe, item_date_start, item_date_end, seed_uids,
TwitterStreamWarcIter, segment_row_size)
class TwitterStreamExporter(BaseExporter):
def __init__(self, api_base_url, working_path, mq_config=None, warc_base_path=None):
log.info("Initing TwitterStreamExporter")
BaseExporter.__init__(self, api_base_url, TwitterStreamWarcIter, TwitterStreamStatusTable, working_path,
mq_config=mq_config, warc_base_path=warc_base_path)
class TwitterStreamStatusTable2(BaseTwitterTwoStatusTable):
def __init__(self, warc_paths, dedupe, item_date_start, item_date_end, seed_uids, segment_row_size=None):
BaseTwitterTwoStatusTable.__init__(self, warc_paths, dedupe, item_date_start, item_date_end, seed_uids,
TwitterStreamWarcIter2, segment_row_size)
class TwitterStreamExporter2(TwitterRestExporter2):
def __init__(self, api_base_url, working_path, mq_config=None, warc_base_path=None):
log.info("Initing TwitterStreamExporter2")
BaseExporter.__init__(self, api_base_url, TwitterStreamWarcIter2, TwitterStreamStatusTable2, working_path,
mq_config=mq_config, warc_base_path=warc_base_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--twitter_version", default='2')
# To remove argument before passing to the wrapped function
# Following example here: https://stackoverflow.com/questions/35733262/is-there-any-way-to-instruct-argparse-python-2-7-to-remove-found-arguments-fro
args, extras = parser.parse_known_args()
sys.argv = sys.argv[:1] + extras
if args.twitter_version == '2':
TwitterStreamExporter2.main(TwitterStreamExporter2, QUEUE2, [FILTER_STREAM_ROUTING_KEY])
else:
TwitterStreamExporter.main(TwitterStreamExporter, QUEUE,
[FILTER_ROUTING_KEY, SAMPLE_ROUTING_KEY])