-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBlacklist.py
230 lines (195 loc) · 9.16 KB
/
Blacklist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""Blacklist.py - a implementation of black/whitelists"""
import logging
from threading import Lock
import re
import utilitymethods
import Actions
import DataBase
from DataExtractors import IdentificationExtractor
class BlacklistEnums:
NotFound, Blacklisted, Whitelisted = range(3)
class Blacklist(object):
""" Also a whitelist, but who's counting
@type data IdentificationExtractor
"""
def __init__(self, data_extractor, database_file):
assert(isinstance(data_extractor, IdentificationExtractor))
#set data
self.name = data_extractor.name
self.data = data_extractor
self.domains = self.data.domains
self.file = database_file
#make sure database is created
with DataBase.DataBaseWrapper(self.file) as db:
pass
def check_blacklist(self, urls=None, ids=None):
""" This method tells you whether each in a list of submission is on a blacklisted channel. Either ids or urls must be specified
:param url: the urls to check
:param id: the ids to check
:return: the appropriate blacklist enums
"""
if not urls and not ids:
logging.warning("No url or id specified")
return BlacklistEnums.NotFound
if urls:
if not isinstance(urls, list):
urls = list(urls)
results = [BlacklistEnums.NotFound for url in urls]
ids = [(i, self.data.channel_id(url)[0]) for i, url in enumerate(urls) if self.check_domain(url)]
ids = [id for id in ids if id and not id[1] == "PRIVATE"]
elif ids:
if not isinstance(ids, list):
ids = list(ids)
results = [BlacklistEnums.NotFound for id in ids]
ids = [(i, id) for i, id in enumerate(ids) if id != "PRIVATE"]
with DataBase.DataBaseWrapper(self.file, False) as db:
temp_ret = db.get_blacklist([(id[1], self.domains[0]) for id in ids])
for i in range(len(ids)):
results[ids[i][0]] = temp_ret[i]
return results
def check_domain(self, url):
"""
Checks whether a domain is valid for this extractor
"""
domain = utilitymethods.domain_extractor(url)
if domain:
return any(domain.startswith(d) or domain.endswith(d) for d in self.domains)
return False
def add_blacklist_urls(self, urls, added_by):
""" adds a channel to the blacklist
:param url: a url of link corresponding to the channel
"""
return self.__add_channels_url(urls, BlacklistEnums.Blacklisted, added_by)
def add_whitelist_urls(self, urls, added_by):
""" adds a channel to the whitelist
:param url: a url of link corresponding to the channel
"""
return self.__add_channels_url(urls, BlacklistEnums.Whitelisted, added_by)
def add_blacklist(self, ids, added_by):
""" adds a channel to the blacklist
:param ids: ids of channels
"""
if not isinstance(ids, list):
ids = [ids]
return self.__add_channels(ids, BlacklistEnums.Blacklisted, added_by)
def add_whitelist(self, ids, added_by):
""" adds a channel to the whitelist
:param url: a url of link corresponding to the channel
"""
if not isinstance(ids, list):
ids = [ids]
return self.__add_channels(ids, BlacklistEnums.Whitelisted, added_by)
def __split_on_condition(self, seq, condition):
a, b = [], []
for item in seq:
(a if condition(item) else b).append(item)
return a,b
def __split_on_condition_altlist(self, seq, condition, altlist):
a, b = [], []
for i, item in enumerate(seq):
if condition(item):
a.append(item)
else:
b.append(altlist[i])
return a,b
def __add_channels(self, ids, value, added_by):
"""Adds a channel to the list
:param ids: a list of ids corresponding to channels to add
:param value: BLACKLIST or WHITELIST
:return: True if successfully added, false otherwise
"""
#transform
entries = [(id, self.domains[0]) for id in ids]
with DataBase.DataBaseWrapper(self.file, False) as db:
#first find list of channels that exist already
existant_channels = db.channel_exists(entries)
if existant_channels is None or not len(existant_channels):
return ids
#split and populate our tuple lists based on this
update_list = []
add_list = []
for i, channel_exists in enumerate(existant_channels):
update_list.append(entries[i])
#add if not existant, and no duplicates
if not channel_exists:
add_list.append(entries[i])
if len(add_list):
db.add_channels(add_list)
#add and update channels
if len(update_list):
db.set_blacklist(update_list, value, added_by)
#finally check for invalid entries
set_correct = db.check_blacklist(update_list, value)
return [ids[i] for i, val in enumerate(set_correct) if not val]
def remove_blacklist_urls(self, urls, added_by):
"""Removes channels from blacklist by URL
:return: a list of urls not valid or not found
"""
return self.__remove_channels_url(urls, BlacklistEnums.Blacklisted, added_by)
def remove_blacklist(self, ids, added_by):
"""Removes channels from blacklist by ID
:return: a list of ids not valid or not found"""
return self.__remove_channels(ids, BlacklistEnums.Blacklisted, added_by)
def remove_whitelist_urls(self, urls, added_by):
"""Removes channels from whitelist by URL
:return: a list of urls not valid or not found"""
return self.__remove_channels_url(urls, BlacklistEnums.Whitelisted, added_by)
def remove_whitelist(self, ids, added_by):
"""Removes channels from whitelist by ID
:return: a list of ids not valid or not found"""
return self.__remove_channels(ids, BlacklistEnums.Whitelisted, added_by)
def __add_channels_url(self, urls, value, added_by):
if not isinstance(urls, list):
urls = [urls]
#check that the domain is being added
my_urls, invalid_urls = self.__split_on_condition(urls, self.check_domain)
#get ids
ids = [self.data.channel_id(url) for url in my_urls]
valid_ids, unresolvable_urls = self.__split_on_condition_altlist(ids, lambda x: x and x[0] != "PRIVATE", my_urls)
failed_ids = self.__add_channels([v[0] for v in valid_ids], value, added_by)
return (invalid_urls + unresolvable_urls, failed_ids, [v[0] for v in valid_ids if not v[0] in failed_ids])
def __remove_channels_url(self, urls, value, added_by):
if not isinstance(urls, list):
urls = [urls]
#check that the domain is being added
my_urls,invalid_urls = self.__split_on_condition(urls, self.check_domain)
#get ids
ids = [self.data.channel_id(url) for url in my_urls]
valid_ids, invalid_ids = self.__split_on_condition_altlist(ids, lambda x: x and x[0] != "PRIVATE", my_urls)
failed_ids = self.__remove_channels([v[0] for v in valid_ids], value, added_by)
return (invalid_urls, invalid_ids + failed_ids, [v[0] for v in valid_ids if v[0] not in failed_ids])
def __remove_channels(self, ids, value, added_by):
if not isinstance(ids, list):
ids = [ids]
invalid_ids = []
valid_ids = []
#transform
entries = [(id, self.domains[0]) for id in ids]
#update database
with DataBase.DataBaseWrapper(self.file, False) as db:
existant_channels = db.channel_exists(entries)
update_list = []
for i, channel_exists in enumerate(existant_channels):
#update if channel exists and not a duplicate
if entries[i] in update_list:
continue
if channel_exists:
update_list.append(entries[i])
valid_ids.append(ids[i])
else:
invalid_ids.append(ids[i])
if len(update_list):
db.set_blacklist(update_list, BlacklistEnums.NotFound, added_by)
#check that they were removed correctly
set_correct = db.check_blacklist(update_list, BlacklistEnums.NotFound)
invalid_ids += [ids[i] for i, val in enumerate(set_correct) if not val]
return invalid_ids
def get_blacklisted_channels(self, filt):
"""returns the blacklisted channel's whos id matches this filter"""
return self.__get_channels(filt, BlacklistEnums.Blacklisted)
def get_whitelisted_channels(self, filt):
"""returns the whitelisted channel's whos id matches this filter"""
return self.__get_channels(filt, BlacklistEnums.Whitelisted)
def __get_channels(self, filter, value):
with DataBase.DataBaseWrapper(self.file, False) as db:
return db.get_channels(blacklist=value, domain=self.domains[0], id_filter=filter)