forked from JQ-Networks/UnifiedMessageRelay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
message_persistence.py
105 lines (96 loc) · 3.93 KB
/
message_persistence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import sqlite3
import datetime
import time
from bot_constant import FORWARD_LIST
import logging
logger = logging.getLogger("CTBMain." + __name__)
class MessageDB:
def __init__(self, db_name: str):
self.conn = sqlite3.connect(db_name, check_same_thread=False)
cursor = self.conn.cursor()
for idx, forward in enumerate(FORWARD_LIST):
table_name = '_' + str(idx)
cursor.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='{table_name}';")
result = cursor.fetchall()
if result[0][0]:
pass
else:
cursor.execute(f"create table {table_name} (tg_message_id int primary key,"
f"qq_message_id int, qq_number int, timestamp int)")
self.conn.commit()
cursor.close()
def append_message(self, qq_message_id: int,
tg_message_id: int,
forward_index: int,
qq_number: int):
"""
append qq message list to database
:param qq_message_id: QQ message id
:param tg_message_id: Telegram message id
:param forward_index: forward index
:param qq_number: If from QQ, then QQ sender's number. If from Telegram, then 0 (used for recall)
:return:
"""
cursor = self.conn.cursor()
table_name = '_' + str(forward_index)
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
logger.debug(f'append tg_msg_id:{tg_message_id}, qq_msg_id:{qq_message_id}, '
f'qq_num:{qq_number}, time:{timestamp} to {table_name}')
# find if already exists
cursor.execute(f"select * from '{table_name}' where tg_message_id = ?", (tg_message_id,))
result = cursor.fetchall()
cursor.close()
cursor = self.conn.cursor()
if len(result): # if exists, update record
cursor.execute(f"update '{table_name}' set qq_message_id=?, qq_number=?, timestamp=? where tg_message_id=?;",
(qq_message_id, qq_number, timestamp, tg_message_id))
else: # if not, create record
cursor.execute(f"insert into '{table_name}' (tg_message_id, qq_message_id, qq_number, timestamp)"
f"values (?, ?, ?, ?)",
(tg_message_id, qq_message_id, qq_number, timestamp))
self.conn.commit()
cursor.close()
def retrieve_message(self, tg_message_id: int,
forward_index: int):
"""
get specific record
:param tg_message_id:
:param forward_index:
:return:
"""
cursor = self.conn.cursor()
table_name = '_' + str(forward_index)
cursor.execute(f"select * from '{table_name}' where tg_message_id = ?", (tg_message_id,))
result = cursor.fetchall()
cursor.close()
if len(result):
return result[0]
else:
return None
def delete_message(self, tg_message_id: int,
forward_index: int):
"""
delete record
:param tg_message_id:
:param forward_index:
:return:
"""
cursor = self.conn.cursor()
table_name = '_' + str(forward_index)
cursor.execute(f"delete from {table_name} where tg_message_id=?;", (tg_message_id,))
self.conn.commit()
cursor.close()
def purge_message(self):
"""
delete outdated records
:return:
"""
cursor = self.conn.cursor()
for idx, forward in enumerate(FORWARD_LIST):
table_name = '_' + str(idx)
purge_time = int(time.mktime((datetime.datetime.now() - datetime.timedelta(weeks=2)).timetuple()))
cursor.execute(f"delete from {table_name} where timestamp < ?;", (purge_time,))
self.conn.commit()
cursor.close()
def __del__(self):
self.conn.close()