-
-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Subscriber Memory Leak #390
Comments
I can confirm this issue on Ubuntu as well. Simple subscriber grows unbounded with respect to memory. Occurs in v5 compat as well as v6 beta 6. |
I have reverted back to my previous configuration (NodeJS v8.0 / Zeromq.js v4.6.0), no memory problems with these versions. |
Thanks for your report. Could you share the code you used to test it (subscriber + producer)? |
I can't reproduce this with the following code on my PR (#444). Unless you give the code in which this happens, I don't think it is possible to fix the issue. thread-worker subscriber exampleimport {Worker} from "worker_threads"
import * as zmq from "./lib/index.js"
export class ThreadedWorker {
static async spawn(threads: number) {
const workers = Array.from({length: threads}).map(() => {
return new Promise((resolve, reject) => {
const src = `
const zmq = require("./lib/index.js")
${ThreadedWorker.toString()}
new ThreadedWorker().run()
`
new Worker(src, {eval: true}).on("exit", code => {
if (code === 0) {
resolve(undefined)
} else {
reject(new Error(`Worker stopped with exit code ${code}`))
}
})
})
})
await Promise.all(workers)
console.log("all workers stopped")
}
/* Queue only 1 incoming message. */
input = new zmq.Pull({receiveHighWaterMark: 1})
output = new zmq.Push()
signal = new zmq.Subscriber()
shift = 13
maxDelay = 2000 /* Average of 1s. */
constructor() {
this.input.connect("inproc://input")
this.output.connect("inproc://output")
this.signal.connect("inproc://signal")
this.signal.subscribe()
const listen = async () => {
for await (const [sig] of this.signal) {
if (sig.toString() === "stop") this.stop()
}
}
listen()
}
async stop() {
this.input.close()
this.output.close()
this.signal.close()
}
/* Loop over input and produce output. */
async run() {
for await (const [pos, req] of this.input) {
if (req.length !== 1) {
console.log(`skipping invalid '${req}'`)
continue
}
console.log(`received work '${req}' at ${pos}`)
const res = await this.work(req.toString())
await this.output.send([pos, res])
console.log(`finished work '${req}' -> '${res}' at ${pos}`)
}
}
/* Do the actual Caesar shift. */
async work(req: string): Promise<string> {
// await new Promise((resolve) => setTimeout(resolve, Math.random() * this.maxDelay))
let char = req.charCodeAt(0)
for (let i = 0; i < 200000001; i++) {
if (char >= 65 && char <= 90) {
char = ((char - 65 + this.shift) % 26) + 65
} else if (char >= 97 && char <= 122) {
char = ((char - 97 + this.shift) % 26) + 97
}
}
return String.fromCharCode(char)
}
}
async function sleep() {
await new Promise(resolve => {
setTimeout(() => {
resolve(undefined)
}, 10000)
})
}
async function main() {
sleep()
console.log("start")
ThreadedWorker.spawn(2);
const x = new ThreadedWorker()
await x.run()
sleep()
x.stop()
}
main().catch(e => {
throw e
}) |
I found an access violation error in Socket Ironpc that might be related to this, but it might not be because it only happens with Ironpc. |
I'm running zeromq.js library along with Node v12 (but also tried with Node v10) on a RaspberryPi 4B running Raspbian Buster.
System:
RaspberryPi 4B (2GB Ram)
The subscriber code is a simple subscriber (as the one provided in the examples).
I started monitoring the subscriber process using linux top utility, initially the subscriber process was using around 30M of memory, after several hours of running (around 9-10hrs) the process was using 370M of memory.
I can see this also looking at the top interface, the memory increase regularly, and after a few minutes I can see an increase of some MB (around 5/6M).
Tell me if I can report any other information to inspect this problem.
The text was updated successfully, but these errors were encountered: