diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d1d29a..85edde6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # CHANGELOG +## 0.2.6 +fix: clean up handles in worker_threads environments to prevent aborting + ## 0.2.5 fix: close the socket if connecting failed diff --git a/js/addon.d.ts b/js/addon.d.ts index c33bc8d..9782e5b 100644 --- a/js/addon.d.ts +++ b/js/addon.d.ts @@ -5,6 +5,7 @@ export function socketNewSoReuseportFd(domain: string, port: number, ip: string): number export function socketClose(fd: number): void +export function addHook(cb: (...args: any[]) => any): void export class SeqpacketSocketWrap { constructor(ee: object, fd?: number | undefined | null) init(thisObj: object): void diff --git a/js/dgram.ts b/js/dgram.ts index 1c414e0..0f3aca8 100644 --- a/js/dgram.ts +++ b/js/dgram.ts @@ -2,6 +2,7 @@ import { EventEmitter } from 'events'; import { DgramSocketWrap } from './addon'; +import { addSocket, deleteSocket } from './uv_socket_util' type FnRecv = (err: undefined | Error, buf: Buffer) => void; export type SendCb = (err: undefined | Error) => void; @@ -39,6 +40,8 @@ export class DgramSocket extends EventEmitter { this.wrap.startRecv(); this.on('_data', this.onData); this.on('_error', this.onError); + + addSocket(this) } private onData = (buf: Buffer, filepath: string) => { @@ -134,6 +137,7 @@ export class DgramSocket extends EventEmitter { if (this.closed) { return; } + deleteSocket(this); this.closed = true; this.wrap.close(); } diff --git a/js/seqpacket.ts b/js/seqpacket.ts index d421b5b..ce3134c 100644 --- a/js/seqpacket.ts +++ b/js/seqpacket.ts @@ -2,6 +2,7 @@ import { EventEmitter } from 'events'; import { SeqpacketSocketWrap } from './addon'; +import { addSocket, deleteSocket } from './uv_socket_util' export type NotifyCb = () => void; @@ -154,6 +155,8 @@ export class SeqpacketSocket extends EventEmitter { this.on('_connect', this.onConnect); this.on('_error', this.onError); this.on('_shutdown', this.onShutdown); + + addSocket(this); } private onEnd = () => { @@ -289,6 +292,14 @@ export class SeqpacketSocket extends EventEmitter { } this.destroyed = true; this.wrap.close(); + deleteSocket(this); + } + + /** + * Alias of "destory". + */ + close() { + return this.destroy(); } /** diff --git a/js/uv_socket_util.ts b/js/uv_socket_util.ts new file mode 100644 index 0000000..ae04538 --- /dev/null +++ b/js/uv_socket_util.ts @@ -0,0 +1,35 @@ +import { addHook } from './addon' +import * as workerThreads from 'worker_threads' + +interface ClosableSocket { + close(): void +} + +const socketSet = new Set() + +export function addSocket(socket: ClosableSocket) { + socketSet.add(socket) +} + +export function deleteSocket(socket: ClosableSocket) { + socketSet.delete(socket) +} + +// Cleanup for worker_threads environment. Otherwise Node.js will abort because there are +// open uv sockets while exiting. +function cleanup() { + const sockets = socketSet.values() + + let next = sockets.next(); + while (!next.done) { + const socket = next.value + socket.close() + next = sockets.next(); + } + + socketSet.clear() +} + +if (!workerThreads.isMainThread) { + addHook(cleanup) +} diff --git a/package.json b/package.json index b159ee5..4853731 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-unix-socket", - "version": "0.2.5", + "version": "0.2.6", "main": "js/index.js", "types": "js/index.d.ts", "author": { diff --git a/src/socket.rs b/src/socket.rs index cb351ec..2c0b536 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -4,7 +4,10 @@ use std::str::FromStr; use crate::util::{error, get_err, resolve_libc_err, resolve_uv_err}; use libc::{c_void, sockaddr_storage, sockaddr_un}; -use napi::{Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result, bindgen_prelude::FromNapiValue}; +use napi::{ + bindgen_prelude::FromNapiValue, Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, + Result, +}; use uv_sys::sys; pub(crate) fn get_loop(env: &Env) -> Result<*mut sys::uv_loop_t> { @@ -275,3 +278,14 @@ fn socket_close(fd: JsNumber) -> Result<()> { close(fd) } + +#[napi] +#[allow(dead_code)] +pub fn add_hook(mut env: Env, cb: JsFunction) -> Result<()> { + env.add_env_cleanup_hook((), move |_| { + let args: Vec = vec![]; + // no chance to handle the error + let _ = cb.call(Option::None, &args); + })?; + Ok(()) +} diff --git a/src/util.rs b/src/util.rs index 678f16c..c4850cc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -2,8 +2,8 @@ use std::ffi::CStr; use std::intrinsics::transmute; use std::mem; -use libc::{sockaddr, sockaddr_un, c_char}; -use napi::{self, Error, JsBuffer, Result, JsFunction, JsObject}; +use libc::{c_char, sockaddr, sockaddr_un}; +use napi::{self, Error, JsBuffer, JsFunction, JsObject, Result}; use nix::errno::Errno; use nix::fcntl::{fcntl, FcntlArg, OFlag}; use uv_sys::sys; @@ -51,7 +51,8 @@ pub(crate) fn uv_err_msg(errno: i32) -> String { let ret = CStr::from_ptr(ret); ret .to_str() - .map_err(|_| error("parsing cstr failed".to_string())).unwrap() + .map_err(|_| error("parsing cstr failed".to_string())) + .unwrap() .to_string() };