Skip to content

Robust Listeners

Vitaly Tomilov edited this page Jul 1, 2017 · 33 revisions

Below is a complete demo application that shows how to automatically re-connect a global direct connection with permanent event listeners in it, should the physical connection fail.

The demo uses connection option onLost of Database.connect that was introduced in v6.2.4

The application sets up one global connection with a notification listener in it, and keeps sending a notification into the channel every second, while the connection is available. When the connection breaks, it executes a simple strategy for restoring the connection, trying 10 times, after each 5 seconds, and if fails to restore the connection - it exits the process.

const cn = {
    // Connection Details
};

const pgp = require('pg-promise')({
    // Initialization Options
});

const db = pgp(cn); // Database Object

let connection; // global connection for permanent event listeners

function onNotification(data) {
    console.log('Received Payload:', data.payload);
}

function setListeners(client) {
    client.on('notification', onNotification);
    connection.none('LISTEN $1~', 'my-channel')
        .catch(error => {
            console.log(error); // unlikely to happen
        });
}

function removeListeners(client) {
    if (connection) {
        connection.none('UNLISTEN $1~', 'my-channel')
            .catch(error => {
                console.log(error); // unlikely to happen
            });
    }
    client.removeListener('notification', onNotification);
}

function onConnectionLost(err, e) {
    console.log('Connectivity Problem:', err);
    connection = null; // prevent use of the connection
    removeListeners(e.client);
    reconnect(5000, 10) // retry 10 times, every 5 seconds
        .then(obj => {
            console.log('Successfully Reconnected');
        })
        .catch(() => {
            // failed after 10 attempts
            console.log('Connection Lost Permanently');
            process.exit(); // exiting the process
        });
}

function reconnect(delay, maxAttempts) {
    delay = delay > 0 ? delay : 0;
    maxAttempts = maxAttempts > 0 ? maxAttempts : 1;
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            db.connect({direct: true, onLost: onConnectionLost})
                .then(obj => {
                    connection = obj; // global connection is now available
                    setListeners(obj.client);
                    resolve(obj);
                })
                .catch(error => {
                    console.log('Error Connecting:', error);
                    if (--maxAttempts) {
                        reconnect(delay, maxAttempts)
                            .then(resolve)
                            .catch(reject);
                    } else {
                        reject(error);
                    }
                });
        }, delay);
    });
}

function sendNotifications() {
    // send a notification to our listener every second:
    setInterval(() => {
        if (connection) {
            connection.none('NOTIFY $1~, $2', ['my-channel', 'my payload string'])
                .catch(error => {
                    console.log('Failed to Notify:', error); // unlikely to happen
                })
        }
    }, 1000);
}

reconnect() // = same as reconnect(0, 1)
    .then(obj => {
        console.log('Successful Initial Connection');
        // obj.done(); - releases the connection
        sendNotifications();
    })
    .catch(error => {
        console.log('Failed Initial Connection:', error);
    });

In order to test this code, you will need to artificially disrupt the communications.

Here are some ways of how this can be done while connected to your local database server:

  • To temporarily kill all connections to your test database, execute the following SQL:
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='my-database-name';

You must provide the correct database name in the query, and make sure to execute it from a connection to a different database, or else you will be killing the current connection, which is likely to crash your pgAdmin UI.

  • To permanently disconnect, locate your PostgreSQL Server in the list of services, and stop it.

  • There are many TCP utilities that can help you disrupt communications on TCP level for your database port. PostgreSQL default port number is 5432.

Clone this wiki locally