-
Notifications
You must be signed in to change notification settings - Fork 4
/
sakura-mongo-db-connection.ts
171 lines (136 loc) · 5.18 KB
/
sakura-mongo-db-connection.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import { Db, MongoClient, MongoClientOptions } from 'mongodb';
const debug = {
normal: require('debug')('sapi:SakuraMongoDbConnection'),
verbose: require('debug')('sapi:SakuraMongoDbConnection:verbose')
};
/**
* SakuraMongoDbConnection is responsible for managing connections to a MongoDB database or cluster.
*/
export class SakuraMongoDbConnection {
private connections = new Map<string, { uri: string, options?: MongoClientOptions }>();
private clients = new Map<string, MongoClient>();
private dbs = new Map<string, Db>();
/**
* Adds the parameters for a connection but doesn't actually connect to the DB. This is used to queue up
* connection configurations that are later used for opening connections to MongoDB with
* [[SakuraMongoDbConnection.connectAll]].
*/
addConnection(dbName: string, uri: string, options?: MongoClientOptions): void {
debug.normal(`.addConnection dbName: '${dbName}', uri: '${uri}', options: %O`, options);
if (typeof dbName !== 'string') {
throw new Error(`dbName must be type string but instead was ${typeof dbName}`);
}
this.connections.set(dbName, {uri, options});
debug.verbose(`.addConnection connections: '%O'`, this.connections);
}
/**
* Connects to MongoDB with the supplied parameters and returns a Promise containing the newly connected Db
* created by `MongoClient.connect`.
*/
async connect(dbName: string, uri: string, options?: MongoClientOptions): Promise<Db> {
debug.normal(`.connect dbName: '${dbName}', uri: '${uri}', options:`, options);
options = options || {};
options.useNewUrlParser = true;
let db: Db;
db = this.getDb(dbName) || null;
if (db) {
debug.normal(`.connect dbName: '${dbName}' already connected`);
return db;
}
// Because the connection is asynchronous, it's possible for multiple calls to connect
// to happen before the first call resolves and sets the entry in the dbs Map. Thus,
// a place holder is inserted to prevent other calls to connect from trying to connect
// with this dbName.
//
// See "parallel, possible race condition" unit test.
this.dbs.set(dbName, {} as any);
this.connections.set(dbName, {uri, options});
try {
const client = await MongoClient.connect(uri, options);
this.clients.set(dbName, client);
db = client.db();
debug.normal(`.connect dbName: '${dbName}' connected`);
this.dbs.set(dbName, db); // replace placeholder with the db
debug.verbose('.connect dbs: %O', this.dbs);
return db;
} catch (err) {
debug.normal(`.connect dbName: '${dbName}' error:`, err);
this.dbs.delete(dbName); // remove placeholder
return Promise.reject(err);
}
}
/**
* Iterates through the connection parameters provided via [[SakuraMongoDbConnection.addConnection]] and connects to
* MongoDb. Returns a Promise containing an array of the connected MongoDB `Db` objects.
*/
async connectAll(): Promise<Db[]> {
debug.normal('.connectAll start');
const wait = [];
for (const connection of this.connections) {
wait.push(this.connect(connection[0], connection[1].uri, connection[1].options));
}
try {
const results = Promise.all(wait);
debug.normal('.connectAll done');
return results;
} catch (err) {
debug.normal(`.connectAll error:`, err);
return Promise.reject(err);
}
}
/**
* Closes a specific db Connection and removes it from [[SakuraMongoDbConnection]]'s internal Maps.
*/
async close(dbName: string, forceClose?: boolean): Promise<void> {
const db = this.dbs.get(dbName);
const client = this.clients.get(dbName);
debug.normal(`.close dbName:'${dbName}', forceClose: ${forceClose}, connection found: ${!!db}`);
if (db && client) {
this.connections.delete(dbName);
this.clients.delete(dbName);
this.dbs.delete(dbName);
return client.close(forceClose);
}
return;
}
/**
* Closes all connections tracked by this instance of [[SakuraMongoDbConnection]].
*/
async closeAll(): Promise<null> {
debug.normal('.closeAll called');
const wait = [];
for (const db of this.dbs) {
wait.push(this.close(db[0]));
}
try {
await Promise.all(wait);
debug.normal('.closeAll done');
} catch (err) {
debug.normal('.closeAll error:', err);
return Promise.reject(err);
}
}
/**
* Gets an MongoDB `Db` object from the private [[SakuraMongoDbConnection]] map tracking connections.
*/
getDb(dbName: string): Db {
const result = this.dbs.get(dbName);
debug.normal(`.getDb dbName: '${dbName}', found: ${!!result}`);
debug.verbose(`.getDb dbs: %O`, this.dbs);
return result;
}
/**
* Gets a connection parameter from the map tracking connections.
*/
getConnection(dbName: string): { uri: string, options?: MongoClientOptions } {
const result = this.connections.get(dbName);
debug.normal(`.getConnection dbName:'${dbName}', found: ${!!result}`);
return result;
}
/**
* Returns a reference to the map of connections currently had.
*/
getConnections(): Map<string, { uri: string, options?: MongoClientOptions }> {
return this.connections;
}
}