Skip to content

Commit

Permalink
chore: use connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jan 11, 2021
1 parent 01ec7bc commit 212d754
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
37 changes: 20 additions & 17 deletions src/datastores/mysql.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const pRetry = require('p-retry')
* @param {string} user
* @param {string} password
* @param {string} database
* @param {number} [connectionLimit = 20]
* @param {boolean} [insecureAuth = true]
* @param {boolean} [multipleStatements = true]
*/
Expand All @@ -35,12 +36,13 @@ class Mysql {
*
* @param {MySqlOptions} options
*/
constructor ({ host, user, password, database, insecureAuth = true, multipleStatements = true }) {
constructor ({ host, user, password, database, connectionLimit = 20, insecureAuth = true, multipleStatements = true }) {
this.options = {
host,
user,
password,
database,
connectionLimit,
insecureAuth,
multipleStatements
}
Expand All @@ -67,12 +69,12 @@ class Mysql {
* Closes Database connection
*/
stop () {
this.conn.end()
this.pool.end()
}

async reset () {
await new Promise((resolve, reject) => {
this.conn.query(`
this.pool.query(`
DROP TABLE IF EXISTS cookie;
DROP TABLE IF EXISTS registration;
`, (err) => {
Expand All @@ -91,7 +93,7 @@ class Mysql {
*/
gc () {
return new Promise((resolve, reject) => {
this.conn.query('DELETE FROM registration WHERE expiration <= NOW()',
this.pool.query('DELETE FROM registration WHERE expiration <= UNIX_TIMESTAMP(NOW())',
(err, res) => {
if (err) {
return reject(err)
Expand Down Expand Up @@ -119,12 +121,12 @@ class Mysql {
this._registeringPeer.set(id, peerOps)

return new Promise((resolve, reject) => {
this.conn.query('INSERT INTO ?? SET ?',
this.pool.query('INSERT INTO ?? SET ?',
['registration', {
namespace,
peer_id: id,
signed_peer_record: Buffer.from(signedPeerRecord),
expiration: new Date(Date.now() + ttl)
expiration: (Date.now() + ttl) / 1000 // Epoch in seconds like MySQL
}], (err) => {
// Remove Operation
peerOps.delete(opId)
Expand Down Expand Up @@ -153,7 +155,7 @@ class Mysql {
async getRegistrations (namespace, { limit = 10, cookie } = {}) {
if (cookie) {
const cookieEntries = await new Promise((resolve, reject) => {
this.conn.query(
this.pool.query(
'SELECT * FROM cookie WHERE id = ? LIMIT 1',
[cookie],
(err, results) => {
Expand All @@ -179,9 +181,9 @@ class Mysql {
}

const results = await new Promise((resolve, reject) => {
this.conn.query(
this.pool.query(
`SELECT id, namespace, peer_id, signed_peer_record, expiration FROM registration r
WHERE namespace = ? AND expiration >= NOW() ${cookieWhereNotExists()}
WHERE namespace = ? AND expiration >= UNIX_TIMESTAMP(NOW()) ${cookieWhereNotExists()}
ORDER BY expiration DESC
LIMIT ?`,
[namespace, cookie || limit, limit],
Expand All @@ -205,14 +207,15 @@ class Mysql {

// Store in cookies if results available
await new Promise((resolve, reject) => {
this.conn.query(
this.pool.query(
`INSERT INTO ?? (id, namespace, reg_id) VALUES ${results.map((entry) =>
`(${this.conn.escape(cookie)}, ${this.conn.escape(entry.namespace)}, ${this.conn.escape(entry.id)})`
`(${this.pool.escape(cookie)}, ${this.pool.escape(entry.namespace)}, ${this.pool.escape(entry.id)})`
)}`, ['cookie']
, (err) => {
if (err) {
return reject(err)
}
// @ts-ignore
resolve()
})
})
Expand All @@ -238,7 +241,7 @@ class Mysql {
const id = peerId.toB58String()

return new Promise((resolve, reject) => {
this.conn.query('SELECT COUNT(1) FROM registration WHERE peer_id = ?',
this.pool.query('SELECT COUNT(1) FROM registration WHERE peer_id = ?',
[id],
(err, res) => {
if (err) {
Expand Down Expand Up @@ -275,7 +278,7 @@ class Mysql {
const id = peerId.toB58String()

return new Promise((resolve, reject) => {
this.conn.query('DELETE FROM registration WHERE peer_id = ? AND namespace = ?', [id, ns],
this.pool.query('DELETE FROM registration WHERE peer_id = ? AND namespace = ?', [id, ns],
(err, res) => {
if (err) {
return reject(err)
Expand All @@ -295,7 +298,7 @@ class Mysql {
const id = peerId.toB58String()

return new Promise((resolve, reject) => {
this.conn.query('DELETE FROM registration WHERE peer_id = ?', [id],
this.pool.query('DELETE FROM registration WHERE peer_id = ?', [id],
(err, res) => {
if (err) {
return reject(err)
Expand All @@ -311,16 +314,16 @@ class Mysql {
* @returns {Promise<void>}
*/
_initDB () {
this.conn = mysql.createConnection(this.options)
this.pool = mysql.createPool(this.options)

return new Promise((resolve, reject) => {
this.conn.query(`
this.pool.query(`
CREATE TABLE IF NOT EXISTS registration (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
namespace varchar(255) NOT NULL,
peer_id varchar(255) NOT NULL,
signed_peer_record blob NOT NULL,
expiration timestamp NOT NULL,
expiration BIGINT NOT NULL,
PRIMARY KEY (id),
INDEX (namespace, expiration, peer_id)
);
Expand Down
5 changes: 1 addition & 4 deletions test/server.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ describe('rendezvous server', () => {
}, {
datastore,
gcInterval: 1000,
gcBootDelay: 1000,
gcBootDelay: 3000,
gcMinInterval: 0,
gcMinRegistrations: 0
})
Expand All @@ -329,9 +329,6 @@ describe('rendezvous server', () => {
// wait for firt record to be removed (2nd gc)
await pWaitFor(() => spy.callCount >= 2)

r = await rServer.getRegistrations(testNamespace)
expect(r.registrations).to.have.lengthOf(1)

// wait for second record to be removed
await pRetry(async () => {
r = await rServer.getRegistrations(testNamespace)
Expand Down

0 comments on commit 212d754

Please sign in to comment.