-
Notifications
You must be signed in to change notification settings - Fork 9
/
db.py
292 lines (251 loc) · 9.58 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import base64
import os
import sqlite3
XP_DB_PATH = os.path.expanduser('~/.hoshino/ai_setu2.db')
class XpCounter:
def __init__(self):
os.makedirs(os.path.dirname(XP_DB_PATH), exist_ok=True)
self._create_table()
def _connect(self):
return sqlite3.connect(XP_DB_PATH)
def _create_table(self):
try:
self._connect().execute('''CREATE TABLE IF NOT EXISTS XP_NUM
(GID INT NOT NULL,
UID INT NOT NULL,
KEYWORD TEXT NOT NULL,
NUM INT NOT NULL,
PRIMARY KEY(GID,UID,KEYWORD));''')
except:
raise Exception('创建表发生错误')
def _add_xp_num(self, gid, uid, keyword):
try:
num = self._get_xp_num(gid, uid, keyword)
if num == None:
num = 0
num += 1
with self._connect() as conn:
conn.execute(
"INSERT OR REPLACE INTO XP_NUM (GID,UID,KEYWORD,NUM) \
VALUES (?,?,?,?)", (gid, uid, keyword, num)
)
except:
raise Exception('更新表发生错误')
def _get_xp_num(self, gid, uid, keyword):
try:
r = self._connect().execute("SELECT NUM FROM XP_NUM WHERE GID=? AND UID=? AND KEYWORD=?", (gid, uid, keyword)).fetchone()
return 0 if r is None else r[0]
except:
raise Exception('查找表发生错误')
def _get_xp_list_group(self, gid, num):
with self._connect() as conn:
r = conn.execute(
f"SELECT KEYWORD,NUM FROM XP_NUM WHERE GID={gid} ORDER BY NUM desc LIMIT {num}").fetchall()
return r if r else {}
def _get_xp_list_personal(self, gid, uid, num):
with self._connect() as conn:
r = conn.execute(
f"SELECT KEYWORD,NUM FROM XP_NUM WHERE GID={gid} AND UID={uid} ORDER BY NUM desc LIMIT {num}").fetchall()
return r if r else {}
def _get_xp_list_kwd_group(self, gid, num):
with self._connect() as conn:
r = conn.execute(
f"SELECT KEYWORD FROM XP_NUM WHERE GID={gid} ORDER BY NUM desc LIMIT {num}").fetchall()
return r if r else {}
def _get_xp_list_kwd_personal(self, gid, uid, num):
with self._connect() as conn:
r = conn.execute(
f"SELECT KEYWORD FROM XP_NUM WHERE GID={gid} AND UID={uid} ORDER BY NUM desc LIMIT {num}").fetchall()
return r if r else {}
def get_xp_list_group(gid,num=10):
XP = XpCounter()
xp_list = XP._get_xp_list_group(gid, num)
if len(xp_list)>0:
data = sorted(xp_list,key=lambda cus:cus[1],reverse=True)
new_data = []
for xp_data in data:
keyword, num = xp_data
new_data.append((keyword,num))
rankData = sorted(new_data,key=lambda cus:cus[1],reverse=True)
return rankData
else:
return []
def get_xp_list_personal(gid,uid,num=10):
XP = XpCounter()
xp_list = XP._get_xp_list_personal(gid,uid,num)
if len(xp_list)>0:
data = sorted(xp_list,key=lambda cus:cus[1],reverse=True)
new_data = []
for xp_data in data:
keyword, num = xp_data
new_data.append((keyword,num))
rankData = sorted(new_data,key=lambda cus:cus[1],reverse=True)
return rankData
else:
return []
def get_xp_list_kwd_group(gid,num=10):
XP = XpCounter()
xp_list_kwd = XP._get_xp_list_kwd_group(gid, num)
if len(xp_list_kwd)>0:
return xp_list_kwd
else:
return []
def get_xp_list_kwd_personal(gid,uid,num=10):
XP = XpCounter()
xp_list_kwd = XP._get_xp_list_kwd_personal(gid,uid,num)
if len(xp_list_kwd)>0:
return xp_list_kwd
else:
return []
def add_xp_num(gid,uid,keyword):
XP = XpCounter()
XP._add_xp_num(gid,uid,keyword)
##########################以下是setu_pic的内容###################################
PIC_DB_PATH = os.path.expanduser('~/.hoshino/ai_setu_pic.db')
class PicCounter:
def __init__(self):
os.makedirs(os.path.dirname(PIC_DB_PATH), exist_ok=True)
self._create_table()
self._create_index()
def _connect(self):
return sqlite3.connect(PIC_DB_PATH)
def _create_table(self):
try:
self._connect().execute('''
CREATE TABLE IF NOT EXISTS PIC_DATA(
ID INTEGER PRIMARY KEY AUTOINCREMENT,
GID INT NOT NULL,
UID INT NOT NULL,
PIC_HASH TEXT NOT NULL,
PIC_DIR TEXT NOT NULL,
PIC_MSG TEXT NOT NULL,
THUMB INT NOT NULL
);''')
except Exception as e:
raise e
def _create_index(self):
try:
self._connect().execute('''
CREATE UNIQUE INDEX IF NOT EXISTS hash ON
PIC_DATA (
PIC_HASH
);''')
except Exception as e:
raise e
def _add_pic(self, gid, uid, pic_hash, pic_dir, pic_msg, thumb): #增加数据
try:
with self._connect() as conn:
conn.execute(
"INSERT OR IGNORE INTO PIC_DATA (GID,UID,PIC_HASH,PIC_DIR,PIC_MSG,THUMB) \
VALUES (?,?,?,?,?,?)", [gid, uid, pic_hash, pic_dir, pic_msg, thumb]
)
except Exception as e:
raise e
def _del_pic(self, id): #删除数据
try:
with self._connect() as conn:
conn.execute(
f"DELETE FROM PIC_DATA WHERE ID =?",[id]
)
except Exception as e:
raise e
def _get_pic_exist_hash(self, pic_hash): #通过hash值来判断图片是否存在,存在1,不存在0
try:
r = self._connect().execute(f"SELECT PIC_HASH FROM PIC_DATA WHERE PIC_HASH=?",[pic_hash]).fetchone()
return 0 if r is None else 1
except Exception as e:
raise e
def _get_pic_exist_id(self, id): #通过id来判断图片是否存在,存在1,不存在0
try:
r = self._connect().execute(f"SELECT PIC_HASH FROM PIC_DATA WHERE ID=?",[id]).fetchone()
return 0 if r is None else 1
except Exception as e:
raise e
def _get_pic_data_id(self, id): #通过自增的ID获取图片信息 路径和msg
try:
r = self._connect().execute("SELECT PIC_DIR,PIC_MSG FROM PIC_DATA WHERE ID=?", [id]).fetchone()
return r if r else {}
except Exception as e:
raise e
def _get_pic_id_hash(self, pic_hash): #通过图片hash获取ID
try:
r = self._connect().execute(f"SELECT ID FROM PIC_DATA WHERE PIC_HASH=?",[pic_hash]).fetchone()
return r if r else {}
except Exception as e:
raise e
def _get_pic_thumb(self, id): #通过自增的ID获取图片信息 thumb
try:
r = self._connect().execute("SELECT THUMB FROM PIC_DATA WHERE ID=?", [id]).fetchone()
return 0 if r is None else r[0]
except Exception as e:
raise e
def _add_pic_thumb(self, id):
try:
num = self._get_pic_thumb(id)
num += 1
with self._connect() as conn:
conn.execute(
f"UPDATE PIC_DATA SET THUMB =? WHERE ID =?",[num,id]
)
except Exception as e:
raise e
def _get_pic_list_all(self, num):
try:
with self._connect() as conn:
r = conn.execute(
f"SELECT ID,PIC_DIR,THUMB FROM PIC_DATA ORDER BY THUMB desc LIMIT {num}").fetchall()
return r if r else {}
except Exception as e:
raise e
def _get_pic_list_group(self, gid, num):
try:
with self._connect() as conn:
r = conn.execute(
f"SELECT ID,PIC_DIR,THUMB FROM PIC_DATA WHERE GID=? ORDER BY THUMB desc LIMIT {num}",[gid]).fetchall()
return r if r else {}
except Exception as e:
raise e
def _get_pic_list_personal(self, uid, num):
try:
with self._connect() as conn:
r = conn.execute(
f"SELECT ID,PIC_DIR,THUMB FROM PIC_DATA WHERE UID=? ORDER BY THUMB desc LIMIT {num}",[uid]).fetchall()
return r if r else {}
except Exception as e:
raise e
def add_pic(gid, uid, pic_hash, pic_dir, pic_msg):
PC = PicCounter()
if not PC._get_pic_exist_hash(pic_hash):
PC._add_pic(gid, uid, pic_hash, pic_dir, pic_msg,0)
return "上传图片成功"
return "上传图片失败"
def add_pic_thumb(id):
PC = PicCounter()
if PC._get_pic_exist_id(id):
PC._add_pic_thumb(id)
return f"点赞{id}号图片成功"
return f"点赞{id}号图片失败"
def get_pic_id_hash(pic_hash):
PC = PicCounter()
id = PC._get_pic_id_hash(pic_hash)
return id
def get_pic_data_id(id):
PC = PicCounter()
r = PC._get_pic_data_id(id)
return r
def get_pic_list_all(num=8):
PC = PicCounter()
r = PC._get_pic_list_all(num)
return r
def get_pic_list_group(gid,num=8):
PC = PicCounter()
r = PC._get_pic_list_group(gid, num)
return r
def get_pic_list_personal(uid,num=8):
PC = PicCounter()
r = PC._get_pic_list_personal(uid, num)
return r
def del_pic(id):
PC = PicCounter()
r = PC._del_pic(id)
return "remove pic success"