Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. #148

Closed
nadalizadeh opened this issue Aug 14, 2017 · 5 comments
Closed

Comments

@nadalizadeh
Copy link

Hello,

I'm using aedes for an online game async communications and chat. The server works fine and after 1 month it suddenly stops working and messages do not get dispatched. I have the following error in the stderr:

(node:3096) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 201 drain listeners added. Use emitter.setMaxListeners() to increase limit

In the stdout log I see new clients connect successfully and messages are received from them but they don't dispatch to the channel for other users.

and in addition I have lots of logs like the following:

client error 685790 keep alive timeout Error: keep alive timeout
    at keepaliveTimeout (/home/mqttuser/node-mqtt-server/node_modules/aedes/lib/handlers/connect.js:44:28)
    at Timeout.timerWrapper [as _onTimeout] (/home/mqttuser/node-mqtt-server/node_modules/retimer/retimer.js:21:16)
    at ontimeout (timers.js:386:14)
    at tryOnTimeout (timers.js:250:5)
    at Timer.listOnTimeout (timers.js:214:5)

Here's my server code:

var writeFileAtomic = require('write-file-atomic')
var fs = require('fs')
var thrift = require('thrift')
var ttypes = require('./gen-nodejs/serialization_types')
var aedes = require('aedes')()
var server = require('net').createServer(aedes.handle)
var port = 1883

var tribeChatHistory = {}
const MAX_HISTORY_FOR_EACH_CHANNEL = 130
const HISTORY_SAVE_INTERVAL_SECONDS = 2 * 60
const HISTORY_PERSIST_FILE_NAME = process.env.HOME + '/.mqtt-server/history.json'

var FTransport = thrift.TFramedTransport
var BTransport = thrift.TBufferedTransport
var Protocol = thrift.TCompactProtocol

function saveData () {
  var theData = JSON.stringify(tribeChatHistory)
  writeFileAtomic(HISTORY_PERSIST_FILE_NAME, theData, {}, function (err) {
    if (err) throw err
    // console.log('saved history!')
  })
}

function loadData (callback) {
  console.log('Load started')
  fs.access(HISTORY_PERSIST_FILE_NAME, fs.constants.R_OK, (err) => {
    if (!err) {
      console.log('Loading history data')
      var fileReadResult = fs.readFileSync(HISTORY_PERSIST_FILE_NAME)
      var contents = fileReadResult.toString()
      tribeChatHistory = JSON.parse(contents, (key, value) => {
        return value && value.type === 'Buffer'
          ? Buffer.from(value.data) : value
      })
    } else {
      console.log('Error loading history file')
    }
    setInterval(saveData, HISTORY_SAVE_INTERVAL_SECONDS * 1000)
    callback()
  })
}

function getLastComponent (channelTopic) {
  var matches = channelTopic.match('.*/(.*)')
  if (matches == null)
    return null
  return matches[1]
}

function startServer () {
  server.listen(port, function () {
    console.log('server listening on port', port)
  })
}

aedes.on('clientError', function (client, err) {
  console.log('client error', client.id, err.message, err.stack)
})

aedes.on('publish', function (packet, client) {
  if (client) {
    console.log('message from client', client.id, 'on', packet.topic)
    // console.dir(packet)

    if (packet.topic.startsWith('unity/percity/')) {
      var chanName = getLastComponent(packet.topic)
      if (!(chanName in tribeChatHistory)) {
        tribeChatHistory[chanName] = []
      }

      var theQ = tribeChatHistory[chanName]
      theQ.push(packet.payload)

      while (theQ.length > MAX_HISTORY_FOR_EACH_CHANNEL) {
        theQ.shift()
      }
    }

  }
})

aedes.on('subscribe', function (subscriptions, client) {
  if (client) {
    if (subscriptions == null || subscriptions.length < 1)
      return
    var theTopic = subscriptions[0].topic
    console.log('subscribe request from client', client.id, 'on', theTopic)

    if (theTopic.startsWith('unity/percity/tribe') || theTopic.startsWith('unity/percity/harbor')) {
      var chanName = getLastComponent(theTopic)
      var historyArray = tribeChatHistory[chanName]

      if (historyArray === undefined || historyArray == null) {
        console.log('No history for channel ' + chanName)
        return
      }

      for (var i = 0; i < historyArray.length; i++) {
        var thePacket = {
          cmd: 'publish',
          qos: 0,
          topic: theTopic,
          payload: historyArray[i],
          retain: false
        }
        client.publish(thePacket)
      }
    }
  }
})

function deserializeChatMessage (packet) {
  var buft = new FTransport(packet.payload)
  var myprot = new Protocol(buft)
  var chatMsg = new ttypes.ChatMessageData()
  chatMsg.read(myprot)

  return chatMsg
}

function serializeChatMessage (chatMsg, packet) {
  var myBuf = new Buffer([])
  var buftra = new BTransport(myBuf, function (outBuf) {
    myBuf = Buffer.concat([myBuf, outBuf])
  })
  var myprot = new Protocol(buftra)
  chatMsg.write(myprot)
  myprot.flush()
  buftra.flush()
  packet.payload = myBuf
}

aedes.authorizePublish = function (client, packet, callback) {
  // if (packet.topic === 'aaaa') {
  //   return callback(new Error('wrong topic'))
  // }

  if (packet.topic.startsWith('unity/percity/tribe') || packet.topic.startsWith('unity/percity/tribe')) {
    try {
      var chatMsg = deserializeChatMessage(packet)
      var logMessage = packet.topic + ' -> User: ' + chatMsg.player.id + ' Name: ' + chatMsg.player.name + ' Message: ' + chatMsg.message
      console.log(logMessage)
      chatMsg.timestamp = Math.floor(Date.now() / 1000)
      serializeChatMessage(chatMsg, packet)
    } catch (err) {
      console.log(packet.topic + ' -> Error parsing thrift message', err.message)
    }
  }

  callback(null)
}

aedes.on('client', function (client) {
  console.log('new client', client.id)
})

// -------------------
loadData(function () {
  startServer()
})
@mcollina
Copy link
Collaborator

mcollina commented Aug 14, 2017 via email

@nadalizadeh
Copy link
Author

v7.10.1

@nadalizadeh
Copy link
Author

I just faced this ticket : #51
I've now changed my concurrency to 1000 to see if works or not. Would it resolve my issue?

@mcollina
Copy link
Collaborator

Can you reproduce this reliably? I think we have a bug on our side. That mitigates the issue, but no, it won't solve it. I recommend you to recycle your node processes every once in a while.

@nadalizadeh
Copy link
Author

The fact that it happens on the production servers and under load, makes it hard to reproduce. But with the code I've sent above, I have experienced this behaviour two times (with concurrency = 100) and each of them after around 1 month of uptime.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants