-
Notifications
You must be signed in to change notification settings - Fork 0
/
dbuni.py
66 lines (57 loc) · 1.71 KB
/
dbuni.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sqlite3
import threading
from queue import Queue
class Database:
def __init__(self, db_path, pool_size=5):
self.db_path = db_path
self.pool_size = pool_size
self.pool = Queue(maxsize=pool_size)
self.lock = threading.Lock()
self._initialize_pool()
def _initialize_pool(self):
for _ in range(self.pool_size):
conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.pool.put(conn)
def _get_connection(self):
with self.lock:
return self.pool.get()
def _return_connection(self, conn):
with self.lock:
self.pool.put(conn)
def open(self):
return self._get_connection()
def close(self, conn):
self._return_connection(conn)
def execute(self, query, params=None):
conn = self.open()
cursor = conn.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
conn.commit()
finally:
self.close(conn)
def fetchone(self, query, params=None):
conn = self.open()
cursor = conn.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchone()
finally:
self.close(conn)
def fetchall(self, query, params=None):
conn = self.open()
cursor = conn.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
finally:
self.close(conn)