-
Notifications
You must be signed in to change notification settings - Fork 8
/
Stream.py
186 lines (150 loc) · 6.47 KB
/
Stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import tweepy
import threading
import logging
from tweepy.models import Status
from tweepy.utils import import_simplejson, urlencode_noplus
import json
import re
json = import_simplejson()
class Stream:
def __init__(self, consumer_key, consumer_secret,
key, secret, name):
self.auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(key, secret)
self.tweetsBuffer = TweetsBuffer()
self.name = name
self.logger = logging.getLogger('TwitterCollector')
#check credentials
if not tweepy.API(self.auth).verify_credentials():
print "Invalid credentials for user: ",self.name,".\nExiting..."
logging.error("Invalid credentials for user: "+self.name+".\nExiting...")
exit(0)
def run(self, users_list = None):
sl = StreamListener()
sl.init(self.tweetsBuffer)
try:
streamer = tweepy.Stream(auth=self.auth,
listener=sl,
timeout=3000000000,
include_entities=1,
include_rts=1)
#load friends
filter = []
if users_list is None:
filter = tweepy.API(self.auth).friends_ids()
else:
for subList in users_list:
for user in subList['users']:
filter.append(user.id)
#remove duplicates
filter = list(set(filter))
sThread = threading.Thread(target=streamer.filter, args=(filter,))
sThread.start()
return sThread
except Exception, e:
print e
def getTweetsBuffer(self):
return self.tweetsBuffer
def getUserList(self, lists):
if lists is None:
return None
api = tweepy.API(self.auth)
users_list = []
for list in lists:
users = []
members = tweepy.Cursor(
api.list_members,
list['owner'],
list['slug']
).items()
for member in members:
users.append(member)
users_list.append(
{
'owner' : list['owner'],
'slug' : list['slug'],
'users' : users
})
return users_list
class StreamListener(tweepy.StreamListener):
def init(self, tweetsBuffer):
#set buffer
self.tweetsBuffer = tweetsBuffer
def parse_status(self, status, retweet = False):
tweet = {
'tweet_id':status.id,
'tweet_text':status.text,
'created_at':status.created_at,
'geo_lat':status.coordinates['coordinates'][0]
if not status.coordinates is None
else 0,
'geo_long': status.coordinates['coordinates'][1]
if not status.coordinates is None
else 0,
'user_id':status.user.id,
'tweet_url':"http://twitter.com/"+status.user.id_str+"/status/"+status.id_str,
'retweet_count':status.retweet_count,
'original_tweet_id':status.retweeted_status.id
if not retweet and (status.retweet_count > 0)
else 0,
'urls': status.entities['urls'],
'hashtags':status.entities['hashtags'],
'mentions': status.entities['user_mentions']
}
#parse user object
user = {
'user_id':status.user.id,
'screen_name': status.user.screen_name,
'name': status.user.name,
'followers_count': status.user.followers_count,
'friends_count': status.user.friends_count,
'description': status.user.description
if not status.user.description is None
else "N/A",
'image_url': status.user.profile_image_url,
'location': status.user.location
if not status.user.location is None
else "N/A",
'created_at': status.user.created_at
}
return {'tweet':tweet, 'user':user}
def on_data(self, data):
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
if self.on_status(status, data) is False:
return False
elif 'delete' in data:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
def on_status(self, status, rawJsonData):
try:
#parse tweet
tweet = self.parse_status(status)
tweet['raw_json'] = rawJsonData
self.tweetsBuffer.insert(tweet)
#parse retweet
if tweet['tweet']['retweet_count'] > 0:
retweet = self.parse_status(status.retweeted_status, True)
retweet['raw_json'] = None
self.tweetsBuffer.insert(retweet)
except Exception:
# Catch any unicode errors while printing to console
# and just ignore them to avoid breaking application.
pass
class TweetsBuffer():
tweetsBuffer = []
def __init__(self):
self.lock = threading.Lock()
def insert(self, tweet):
self.lock.acquire()
self.tweetsBuffer.append(tweet)
self.lock.release()
def pop(self):
self.lock.acquire()
tweet = self.tweetsBuffer.pop() if len(self.tweetsBuffer) > 0 else None
self.lock.release()
return tweet