Skip to content

Robust Listeners

Vitaly Tomilov edited this page Apr 5, 2021 · 33 revisions

Below is a complete demo application that shows how to automatically re-connect a global direct connection with permanent event listeners in it, should the physical connection fail.

The demo makes use of option onLost of Database.connect method.

The application sets up one global connection with a notification listener in it, and keeps sending a notification into the channel every second, while the connection is available. When the connection breaks, it executes a simple strategy for restoring the connection, trying 10 times, after each 5 seconds, and if fails to restore the connection - it exits the process.

const cn = {
    // Connection Details
};

const pgp = require('pg-promise')({
    // Initialization Options
});

const db = pgp(cn); // Database Object

let connection; // global connection for permanent event listeners

function onNotification(data) {
    console.log('Received Payload:', data.payload);
}

function setListeners(client) {
    client.on('notification', onNotification);
    return connection.none('LISTEN $1~', 'my-channel')
        .catch(error => {
            console.log(error); // unlikely to ever happen
        });
}

function removeListeners(client) {
    client.removeListener('notification', onNotification);
}

function onConnectionLost(err, e) {
    console.log('Connectivity Problem:', err);
    connection = null; // prevent use of the broken connection
    removeListeners(e.client);
    reconnect(5000, 10) // retry 10 times, with 5-second intervals
        .then(() => {
            console.log('Successfully Reconnected');
        })
        .catch(() => {
            // failed after 10 attempts
            console.log('Connection Lost Permanently');
            process.exit(); // exiting the process
        });
}

function reconnect(delay, maxAttempts) {
    delay = delay > 0 ? parseInt(delay) : 0;
    maxAttempts = maxAttempts > 0 ? parseInt(maxAttempts) : 1;
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            db.connect({direct: true, onLost: onConnectionLost})
                .then(obj => {
                    connection = obj; // global connection is now available
                    resolve(obj);
                    return setListeners(obj.client);
                })
                .catch(error => {
                    console.log('Error Connecting:', error);
                    if (--maxAttempts) {
                        reconnect(delay, maxAttempts)
                            .then(resolve)
                            .catch(reject);
                    } else {
                        reject(error);
                    }
                });
        }, delay);
    });
}

function sendNotifications() {
    // send a notification to our listener every second:
    setInterval(() => {
        if (connection) {
            connection.none('NOTIFY $1~, $2', ['my-channel', 'my payload string'])
                .catch(error => {
                    console.log('Failed to Notify:', error); // unlikely to ever happen
                })
        }
    }, 1000);
}

reconnect() // = same as reconnect(0, 1)
    .then(obj => {
        console.log('Successful Initial Connection');
        // obj.done(); - releases the connection
        sendNotifications();
    })
    .catch(error => {
        console.log('Failed Initial Connection:', error);
    });

In order to test this code properly, you will need to manually disrupt the communications.

Here are some ways of how this can be done while connected to your local database server:

  • To temporarily kill all connections to your test database, execute the following SQL:
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='my-database-name';

You must provide the correct database name in the query, and make sure to execute it from a connection to a different database, or else you will be killing the current connection, which is likely to crash your pgAdmin UI.

  • To permanently disconnect, locate your PostgreSQL Server in the list of services, and stop it.

  • There are many TCP utilities that can help you disrupt communications on TCP level for your database port. PostgreSQL default port number is 5432.

Clone this wiki locally