From d7f6bb5c6604f6534c69f0cd1cd658066c4f9553 Mon Sep 17 00:00:00 2001 From: Manuel Spigolon Date: Fri, 3 Sep 2021 19:17:09 +0200 Subject: [PATCH 1/7] feat: export pino v7 transport --- lib/TcpConnection.js | 32 ++++++--- lib/UdpConnection.js | 8 ++- lib/pino-transport.js | 36 ++++++++++ package.json | 3 +- psock.js | 154 +++++++++++++++++++++--------------------- test/transport.js | 58 ++++++++++++++++ 6 files changed, 203 insertions(+), 88 deletions(-) create mode 100644 lib/pino-transport.js create mode 100644 test/transport.js diff --git a/lib/TcpConnection.js b/lib/TcpConnection.js index 4bf3326..0439117 100644 --- a/lib/TcpConnection.js +++ b/lib/TcpConnection.js @@ -31,10 +31,8 @@ module.exports = function factory (userOptions) { userOptions ) - // We use this passthrough to buffer incoming messages. - const inputStream = new stream.PassThrough() - process.stdin.pipe(inputStream) - inputStream.pause() + const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream') + ? options.sourceStream : buildSourceStream() let socket = null let connected = false @@ -60,8 +58,11 @@ module.exports = function factory (userOptions) { connecting = false connected = true if (cb) cb(null, connected) - inputStream.pipe(outputStream, { end: false }) - inputStream.resume() + + if (sourceStream) { + sourceStream.pipe(outputStream, { end: false }) + sourceStream.resume() + } } ) addListeners() @@ -70,8 +71,11 @@ module.exports = function factory (userOptions) { function disconnect () { connected = false connecting = false - inputStream.pause() - inputStream.unpipe(outputStream) + + if (sourceStream) { + sourceStream.pause() + sourceStream.unpipe(outputStream) + } } function reconnect () { @@ -123,6 +127,16 @@ module.exports = function factory (userOptions) { socket.removeAllListeners('error') } - connect() + connect(() => { + // TODO we must propagate the events from the socket to the outputStream + outputStream.emit('open') + }) return outputStream } + +function buildSourceStream () { + // We use this passthrough to buffer incoming messages. + const inputStream = new stream.PassThrough() + process.stdin.pipe(inputStream) + inputStream.pause() +} diff --git a/lib/UdpConnection.js b/lib/UdpConnection.js index fd9bee7..5fcddfa 100644 --- a/lib/UdpConnection.js +++ b/lib/UdpConnection.js @@ -21,6 +21,12 @@ module.exports = function factory (userOptions) { } }) - process.stdin.pipe(writableStream, { end: false }) + const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream') + ? options.sourceStream : process.stdin + + if (sourceStream) { + sourceStream.pipe(writableStream, { end: false }) + } + return writableStream } diff --git a/lib/pino-transport.js b/lib/pino-transport.js new file mode 100644 index 0000000..eeff991 --- /dev/null +++ b/lib/pino-transport.js @@ -0,0 +1,36 @@ +'use strict' + +const tcpConnectionFactory = require('./TcpConnection') +const udpConnectionFactory = require('./UdpConnection') +const { once } = require('events') + +const defaultOptions = { + unixsocket: '', + address: '127.0.0.1', + mode: 'udp', + port: '514', + echo: true, + reconnect: false, + reconnectTries: Infinity, + settings: null +} + +async function socketTransport (opts) { + const options = Object.assign({ + sourceStream: false + }, defaultOptions, opts) + + let connection + if (options.mode === 'tcp') { + connection = tcpConnectionFactory(options) + } else { + connection = udpConnectionFactory(options) + } + + await once(connection, 'open') + + return connection +} + +module.exports = socketTransport +module.exports.defaultOptions = defaultOptions diff --git a/package.json b/package.json index 1ab64f6..1b0b028 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "main": "psock.js", "scripts": { "test": "standard && mocha --ui qunit -R dot test", + "test:unit": "mocha --ui qunit test/transport.js", "cov": "nyc mocha --ui qunit -R dot test" }, "bin": { @@ -26,7 +27,7 @@ "chai": "^4.2.0", "mocha": "^9.0.0", "nyc": "^15.1.0", - "pino": "^6.3.2", + "pino": "^7.0.0-rc.3", "pre-commit": "^1.1.3", "standard": "^14.3.4" }, diff --git a/psock.js b/psock.js index 8f375ea..dfc8b3d 100644 --- a/psock.js +++ b/psock.js @@ -3,99 +3,99 @@ const path = require('path') const tcpConnectionFactory = require(path.join(__dirname, 'lib', 'TcpConnection')) const udpConnectionFactory = require(path.join(__dirname, 'lib', 'UdpConnection')) +const transport = require('./lib/pino-transport') const split2 = require('split2') const pump = require('pump') const through2 = require('through2') const nopt = require('nopt') const fs = require('fs') -let options = { - unixsocket: '', - address: '127.0.0.1', - mode: 'udp', - port: '514', - echo: true, - reconnect: false, - reconnectTries: Infinity, - settings: null -} -const longOpts = { - unixsocket: String, - address: String, - mode: ['tcp', 'udp'], - port: Number, - reconnect: Boolean, - reconnectTries: Number, - echo: Boolean, - help: Boolean, - version: Boolean, - settings: String -} -const shortOpts = { - u: '--unixsocket', - a: '--address', - m: '--mode', - p: '--port', - r: '--reconnect', - t: '--reconnectTries', - e: '--echo', - ne: '--no-echo', - h: '--help', - v: '--version', - s: '--settings' +// TODO quick and dirty +if (require.main !== module) { + module.exports = transport +} else { + cli() } -const argv = nopt(longOpts, shortOpts, process.argv) -options = Object.assign(options, argv) -if (options.help) { - console.log(fs.readFileSync(path.join(__dirname, 'help.txt'), 'utf8')) - process.exit(0) -} +function cli () { + const longOpts = { + unixsocket: String, + address: String, + mode: ['tcp', 'udp'], + port: Number, + reconnect: Boolean, + reconnectTries: Number, + echo: Boolean, + help: Boolean, + version: Boolean, + settings: String + } + const shortOpts = { + u: '--unixsocket', + a: '--address', + m: '--mode', + p: '--port', + r: '--reconnect', + t: '--reconnectTries', + e: '--echo', + ne: '--no-echo', + h: '--help', + v: '--version', + s: '--settings' + } + const argv = nopt(longOpts, shortOpts, process.argv) + let options = Object.assign({}, transport.defaultOptions, argv) -if (options.version) { - console.log('pino-socket', require('./package.json').version) - process.exit(0) -} + if (options.help) { + console.log(fs.readFileSync(path.join(__dirname, 'help.txt'), 'utf8')) + process.exit(0) + } -if (options.settings) { - try { - const loadedSettings = require(path.resolve(options.settings)) - const settings = Object.assign(loadedSettings, argv) - options = Object.assign(options, settings) - } catch (e) { - console.error('`settings` parameter specified but could not load file: %s', e.message) - process.exit(1) + if (options.version) { + console.log('pino-socket', require('./package.json').version) + process.exit(0) } -} -let connection -if (options.mode === 'tcp') { - connection = tcpConnectionFactory(options) -} else { - connection = udpConnectionFactory(options) -} + if (options.settings) { + try { + const loadedSettings = require(path.resolve(options.settings)) + const settings = Object.assign(loadedSettings, argv) + options = Object.assign(options, settings) + } catch (e) { + console.error('`settings` parameter specified but could not load file: %s', e.message) + process.exit(1) + } + } -function shutdown () { - try { - connection.close() - } catch (e) { + let connection + if (options.mode === 'tcp') { + connection = tcpConnectionFactory(options) + } else { + connection = udpConnectionFactory(options) + } + + function shutdown () { + try { + connection.close() + } catch (e) { // I assume that due to the closing of the pipe, the dgram/tcp socket has // some issues shutting down gracefully. Don't really care, though. So // this try/catch is here to suppress the resulting error. - process.exit() + process.exit() + } } -} -process.on('SIGINT', function sigint () { - shutdown() -}) -process.on('SIGTERM', function sigterm () { - shutdown() -}) + process.on('SIGINT', function sigint () { + shutdown() + }) + process.on('SIGTERM', function sigterm () { + shutdown() + }) -const myTransport = through2.obj(function transport (chunk, enc, cb) { - setImmediate(() => console.log(chunk)) - cb() -}) + const myTransport = through2.obj(function transport (chunk, enc, cb) { + setImmediate(() => console.log(chunk)) + cb() + }) -process.stdin.on('close', () => { shutdown() }) -if (options.echo) pump(process.stdin, split2(), myTransport) + process.stdin.on('close', () => { shutdown() }) + if (options.echo) pump(process.stdin, split2(), myTransport) +} diff --git a/test/transport.js b/test/transport.js new file mode 100644 index 0000000..53b5bef --- /dev/null +++ b/test/transport.js @@ -0,0 +1,58 @@ +'use strict' +/* eslint-env node, mocha */ + +const pino = require('pino') +const net = require('net') +const { expect } = require('chai') + +function createTcpListener (msgHandler) { + return new Promise((resolve, reject) => { + const socket = net.createServer((connection) => { + connection.on('data', (data) => { + msgHandler(data.toString()) + }) + }) + socket.listen(0, '127.0.0.1', (err) => { + if (err) { + return reject(err) + } + return resolve(socket) + }) + }) +} + +test('tcp send', function tcp (done) { + this.timeout(50000) + + let socket + let transport + + createTcpListener((msg) => { + expect(msg).to.contain('"msg":"hello TCP world"') + expect(msg.substr(-1)).to.equal('\n') + done() + + transport.end() + socket.close() + socket.unref() + }) + .then((serverSocket) => { + socket = serverSocket + const address = socket.address().address + const port = socket.address().port + + transport = pino.transport({ + target: '../psock.js', + level: 'info', + options: { + mode: 'tcp', + address, + port + } + }) + const log = pino(transport) + + log.info('hello TCP world') + }) + .catch(done) +}) From 05e3507a3b750c5213029603e24c4b6b74c577db Mon Sep 17 00:00:00 2001 From: Manuel Spigolon Date: Wed, 8 Sep 2021 09:41:09 +0200 Subject: [PATCH 2/7] fix test --- lib/TcpConnection.js | 7 +++---- psock.js | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/TcpConnection.js b/lib/TcpConnection.js index 0439117..ac5f272 100644 --- a/lib/TcpConnection.js +++ b/lib/TcpConnection.js @@ -30,10 +30,8 @@ module.exports = function factory (userOptions) { }, userOptions ) - const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream') - ? options.sourceStream : buildSourceStream() - + ? options.sourceStream : buildCliSourceStream() let socket = null let connected = false let connecting = false @@ -134,9 +132,10 @@ module.exports = function factory (userOptions) { return outputStream } -function buildSourceStream () { +function buildCliSourceStream () { // We use this passthrough to buffer incoming messages. const inputStream = new stream.PassThrough() process.stdin.pipe(inputStream) inputStream.pause() + return inputStream } diff --git a/psock.js b/psock.js index dfc8b3d..dee87cd 100644 --- a/psock.js +++ b/psock.js @@ -10,10 +10,11 @@ const through2 = require('through2') const nopt = require('nopt') const fs = require('fs') -// TODO quick and dirty if (require.main !== module) { + // used as module module.exports = transport } else { + // usaed as cli cli() } From 1304361a4e19406eba17de2fd48584e670b37ad4 Mon Sep 17 00:00:00 2001 From: Manuel Spigolon Date: Wed, 8 Sep 2021 10:59:51 +0200 Subject: [PATCH 3/7] add docs --- Readme.md | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/Readme.md b/Readme.md index bc1ef9b..1c6126e 100644 --- a/Readme.md +++ b/Readme.md @@ -18,8 +18,30 @@ $ npm install --production -g pino-socket [pino]: https://www.npmjs.com/package/pino -## Usage +## Usage as Pino Transport + +You can use this module as a [pino transport](https://getpino.io/#/docs/transports?id=v7-transports) like so: + +```js +const pino = require('pino') +const transport = pino.transport({ + target: 'pino-socket', + options: { + address: '10.10.10.5', + port: 5000. + mode: 'tcp' + } +}) +pino(transport) +``` + +All the options are described further down. +Note that the `echo` option is disabled within this usage. + +## Usage as Pino Legacy Transport +Pino supports a [legacy transport interface](https://getpino.io/#/docs/transports?id=legacy-transports) +that is still supported by this module. Given an application `foo` that logs via [pino][pino], and a system that collects logs on port UDP `5000` on IP `10.10.10.5`, you would use `pino-socket` like so: From bdd2f6ffa587077ef9ed59e914e06378c6d31dbc Mon Sep 17 00:00:00 2001 From: Manuel Spigolon Date: Wed, 8 Sep 2021 11:42:30 +0200 Subject: [PATCH 4/7] fix feedback --- psock.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/psock.js b/psock.js index dee87cd..e91b9d6 100644 --- a/psock.js +++ b/psock.js @@ -10,10 +10,9 @@ const through2 = require('through2') const nopt = require('nopt') const fs = require('fs') -if (require.main !== module) { - // used as module - module.exports = transport -} else { +module.exports = transport + +if (require.main === module) { // usaed as cli cli() } From 917defc37ed3794a5bc655dea40ec9016abb9c01 Mon Sep 17 00:00:00 2001 From: Manuel Spigolon Date: Wed, 8 Sep 2021 13:31:51 +0200 Subject: [PATCH 5/7] Update Readme.md Co-authored-by: James Sumners --- Readme.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Readme.md b/Readme.md index 1c6126e..55282c2 100644 --- a/Readme.md +++ b/Readme.md @@ -35,7 +35,7 @@ const transport = pino.transport({ pino(transport) ``` -All the options are described further down. +All options are described further below. Note that the `echo` option is disabled within this usage. ## Usage as Pino Legacy Transport From 24127b10191c45059921a2461640c19f32a27195 Mon Sep 17 00:00:00 2001 From: Manuel Spigolon Date: Wed, 8 Sep 2021 13:35:59 +0200 Subject: [PATCH 6/7] typo --- psock.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psock.js b/psock.js index e91b9d6..15d1b53 100644 --- a/psock.js +++ b/psock.js @@ -13,7 +13,7 @@ const fs = require('fs') module.exports = transport if (require.main === module) { - // usaed as cli + // used as cli cli() } From 7bf2fc941a4600358835675eff1b7c0980944fdc Mon Sep 17 00:00:00 2001 From: Manuel Spigolon Date: Wed, 8 Sep 2021 14:48:53 +0200 Subject: [PATCH 7/7] fix nodejs 12 configuration --- lib/TcpConnection.js | 1 + test/transport.js | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/TcpConnection.js b/lib/TcpConnection.js index ac5f272..dc32ed4 100644 --- a/lib/TcpConnection.js +++ b/lib/TcpConnection.js @@ -39,6 +39,7 @@ module.exports = function factory (userOptions) { // This stream is the one returned to psock.js. const outputStream = stream.Writable({ + autoDestroy: true, close () { socket.end() }, write (data, encoding, callback) { socket.write(data) diff --git a/test/transport.js b/test/transport.js index 53b5bef..db46ea8 100644 --- a/test/transport.js +++ b/test/transport.js @@ -32,7 +32,6 @@ test('tcp send', function tcp (done) { expect(msg.substr(-1)).to.equal('\n') done() - transport.end() socket.close() socket.unref() })