-
Notifications
You must be signed in to change notification settings - Fork 3
/
rss.py
268 lines (246 loc) · 7.86 KB
/
rss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import calendar
import time
import requests
import feedparser
from typing import Dict
import data.sqlite
import api.gocqhttp
from handlers.message import Message
__db_name = 'commands.rss.db'
__rss_group: Dict[str, set] = {}
__rss_private: Dict[str, set] = {}
__last_feed: Dict[str, str] = {}
__err_list: Dict[str, float] = {}
__last_sync: bool
def __send():
"""
发送推送信息
"""
global __err_list
for url, lst in __rss_group.items():
msg = __request(url)
if msg:
for uid in lst:
api.gocqhttp.send_group_msg(uid, msg)
for url, lst in __rss_private.items():
msg = __request(url)
if msg:
for uid in lst:
api.gocqhttp.send_private_msg(uid, msg)
# 错误列表
tm_now = time.time()
for url, tm in __err_list.items():
if tm_now - tm > 1 * 24 * 3600:
msg = f'{url} 已经超过一天不能正常推送消息,请考虑删除它 (ᗜ˰ᗜ)'
for uid in __rss_group.get(url, []):
api.gocqhttp.send_group_msg(uid, msg)
for uid in __rss_private.get(url, []):
api.gocqhttp.send_private_msg(uid, msg)
__err_list[url] = tm_now
def __request(url: str) -> str:
"""
获取新推送 不能正常获取推送的加入 __err_list
:param url: url
:return: 推送信息
"""
global __last_feed, __err_list
tm_now = time.time()
ans = []
last_feed = False
last_title = ''
if url in __last_feed.keys():
last_feed = True
last_title = __last_feed[url]
try:
res = requests.get(url, timeout=10)
if res.status_code == 200:
content = feedparser.parse(str(res.content, "utf-8"))
else:
return ''
# 获取推送
entries = content.get('entries')
feed = content.get('feed')
# 没有入口
if entries is None or feed is None or len(entries) == 0:
if url not in __err_list.keys():
__err_list[url] = tm_now
return ''
elif url in __err_list.keys():
# 可以正常推送,如果在列表中则删除
__err_list.pop(url)
# 新推送
for row in entries:
title = row['title']
update_time = calendar.timegm(row['published_parsed'])
if last_feed and last_title == title:
break
# 半小时以内可以接受
if tm_now - update_time < 1800.0:
ans.append(f'{title}\n{row.get("author", "")}\n{row.get("link", "")}')
else:
break
if len(ans) > 0:
__last_feed[url] = entries[0]['title']
msg = f'来自 {feed.get("title", "")}'
for s in ans:
msg += '\n' + s
return msg
else:
return ''
except:
if url not in __err_list.keys():
__err_list[url] = tm_now
return ''
def __db_exec(sql: str, sql_value: tuple) -> bool:
conn = data.sqlite.sqlite_open_db(__db_name)
try:
cur = conn.cursor()
cur.execute(sql, sql_value)
cur.close()
except:
data.sqlite.sqlite_close_db(conn)
return False
data.sqlite.sqlite_close_db(conn)
return True
def __add(msg_type: str, qid: int, url: str) -> bool:
sql = 'INSERT INTO rss(type, id, url) VALUES(?, ?, ?);'
sql_value = (msg_type, qid, url)
if msg_type == 'group':
if url not in __rss_group.keys():
__rss_group[url] = set()
# 已经存在
if qid in __rss_group[url]:
return True
__rss_group[url].add(qid)
return __db_exec(sql, sql_value)
elif msg_type == 'private':
if url not in __rss_private.keys():
__rss_private[url] = set()
# 已经存在
if qid in __rss_private[url]:
return True
__rss_private[url].add(qid)
return __db_exec(sql, sql_value)
else:
return False
def __get(msg_type: str, qid: int) -> str:
url_lst = []
if msg_type == 'group':
for url, lst in __rss_group.items():
if qid in lst:
url_lst.append(url)
elif msg_type == 'private':
for url, lst in __rss_private.items():
if qid in lst:
url_lst.append(url)
else:
return ''
if url_lst:
ans = '订阅列表:'
else:
ans = '小白这里没有你的记录诶'
for i in range(len(url_lst)):
ans += f'\n{i+1} {url_lst[i]}'
return ans
def __del(msg_type: str, qid: int, index: int) -> bool:
sql = 'DELETE FROM rss WHERE type=? AND id=? AND url=?;'
if index <= 0:
return False
if msg_type == 'group':
for url, lst in __rss_group.items():
if qid in lst:
index -= 1
if index == 0:
lst.remove(qid)
return __db_exec(sql, (msg_type, qid, url))
elif msg_type == 'private':
for url, lst in __rss_private.items():
if qid in lst:
index -= 1
if index == 0:
lst.remove(qid)
return __db_exec(sql, (msg_type, qid, url))
else:
return False
def __err(msg_type: str, qid: int) -> str:
if msg_type == 'group':
search_dict = __rss_group
elif msg_type == 'private':
search_dict = __rss_private
else:
return ''
ans = []
for url in __err_list.keys():
if qid in search_dict.get(url, []):
ans.append(url)
if len(ans) == 0:
msg = '正常'
else:
msg = '出错列表:'
for url in ans:
msg += f'\n{url}'
return msg
def config():
global __rss_private, __rss_group, __last_sync
conn = data.sqlite.sqlite_open_db(__db_name)
rss_group: Dict[str, set] = {}
rss_private: Dict[str, set] = {}
cur = conn.cursor()
cur.execute('CREATE TABLE IF NOT EXISTS rss(type text, id int, url text);')
for row in cur.execute('SELECT type, url, id FROM rss;'):
if row[0] == 'group':
if row[1] not in rss_group.keys():
rss_group[row[1]] = set()
rss_group[row[1]].add(row[2])
elif row[0] == 'private':
if row[1] not in rss_private.keys():
rss_private[row[1]] = set()
rss_private[row[1]].add(row[2])
cur.close()
data.sqlite.sqlite_close_db(conn)
__rss_private = rss_private
__rss_group = rss_group
__last_sync = time.time()
def run(message: Message):
if time.time() - __last_sync > 50*60.0:
config()
help_msg = 'rss 推送\n' \
'用法:\n' \
' rss list\n' \
' rss add <link>\n' \
' rss del <index>\n' \
' rss status'
msg_list = message.message.split()
msg_len = len(msg_list)
if message.message_type == 'group':
qid = message.group_id
elif message.message_type == 'private':
qid = message.user_id
else:
return ''
ans = help_msg
if msg_len == 2:
if msg_list[1] == 'list':
return __get(message.message_type, qid)
elif msg_list[1] == 'status':
return __err(message.message_type, qid)
elif msg_list[1] == 'send':
__send()
return ''
elif msg_len == 3:
if msg_list[1] == 'add':
if __add(message.message_type, qid, msg_list[2]):
ans = f'添加成功 {msg_list[2]}'
else:
ans = f'添加失败'
elif msg_list[1] == 'del':
try:
index = int(msg_list[2])
except:
ans = 'del 需要一个数字下标'
else:
if __del(message.message_type, qid, index):
ans = '删除成功'
else:
ans = '删除失败'
return ans