Skip to content

Robust Listeners

Vitaly Tomilov edited this page Jul 1, 2017 · 33 revisions

Below is a complete demo application that shows how to automatically re-connect a global direct connection with permanent event listeners in it, should the physical connection fail.

The demo uses connection option onLost of Database.connect that was introduced in v6.2.4

const cn = {
    // connection details
};

const pgp = require('pg-promise')({
    // Initialization Options
});

const db = pgp(cn); // database object

let connection; // global connection for event listeners

function onNotification(data) {
    console.log('Received Payload:', data.payload);
}

function setListeners(client) {
    client.on('notification', onNotification);
    connection.none('LISTEN $1~', 'my-channel')
        .catch(error => {
            console.log(error);
        });
}

function removeListeners(client) {
    client.removeListener('notification', onNotification);
}

function onConnectionLost(err, e) {
    console.log('Connectivity Problem:', err);
    connection = null;
    removeListeners(e.client);
    reconnect(5000, 10) // retry 10 times, every 5 seconds
        .then(obj => {
            console.log('Successfully Reconnected');
        })
        .catch(() => {
            console.log('Connection Lost Permanently');
            process.exit(); // exiting the process
        });
}

function reconnect(delay, maxAttempts) {
    delay = delay > 0 ? delay : 0;
    maxAttempts = maxAttempts > 0 ? maxAttempts : 1;
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            db.connect({direct: true, onLost: onConnectionLost})
                .then(obj => {
                    connection = obj;
                    setListeners(obj.client);
                    resolve(obj);
                })
                .catch(error => {
                    console.log('Error Connecting:', error);
                    if (--maxAttempts) {
                        reconnect(delay, maxAttempts)
                            .then(resolve)
                            .catch(reject);
                    } else {
                        reject(error);
                    }
                });
        }, delay);
    });
}

reconnect()
    .then(obj => {
        console.log('Successful Initial Connection');
        // obj.done(); - releases the connection
    })
    .catch(error => {
        console.log('Failed Initial Connection:', error);
    });

setInterval(() => {
    if (connection) {
        connection.none('NOTIFY $1~, $2', ['my-channel', 'my payload string'])
            .catch(error => {
                console.log('Failed to Notify:', error);
            })
    }
}, 1000);
Clone this wiki locally