-
Notifications
You must be signed in to change notification settings - Fork 0
/
database.py
65 lines (49 loc) · 1.64 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import pymongo
import settings
import os
class DB():
def __init__(self):
self.client = pymongo.MongoClient(os.getenv('DB_SERVER'), int(os.getenv('DB_PORT'))) # Connect to client
self.db = self.client[os.getenv('DB_NAME')]
def close(self):
self.client.close()
def get_db_name(self):
return self.db.name
def insert_one(self, coll, document):
coll = self.db[coll]
coll.insert_one(document)
def insert_many(self, coll, documents):
coll = self.db[coll]
coll.insert_many(documents)
def find_one(self, coll):
coll = self.db[coll]
return coll.find_one()
def find(self, coll, filter=None):
coll = self.db[coll]
return coll.find(filter)
def find_one_and_delete(self, coll, filter):
coll = self.db[coll]
return coll.find_one_and_replace(filter)
def find_one_and_replace(self, coll, filter, replacement):
coll = self.db[coll]
return coll.find_one_and_delete(filter, replacement)
def find_one_and_update(self, coll, filter, update):
coll = self.db[coll]
return coll.find_one_and_update(filter, update)
def update_one(self, coll, filter, update):
coll = self.db[coll]
coll.update_one(filter, update)
def update_many(self, coll, filter, update):
coll = self.db[coll]
coll.update_many(filter, update)
def delete_one(self, coll, filter):
coll = self.db[coll]
coll.delete_one(filter)
def delete_many(self, coll, filter):
coll = self.db[coll]
coll.delete_many(filter)
def count_documents(self, coll, filter):
coll = self.db[coll]
return coll.count_documents(filter) # 'filter' can be an empty document to count all documents
def get_timestamp(self, document):
return str(document['_id'].generation_time)