Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pino v7 transport #36

Merged
merged 7 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,30 @@ $ npm install --production -g pino-socket

[pino]: https://www.npmjs.com/package/pino

## Usage
## Usage as Pino Transport

You can use this module as a [pino transport](https://getpino.io/#/docs/transports?id=v7-transports) like so:

```js
const pino = require('pino')
const transport = pino.transport({
target: 'pino-socket',
options: {
address: '10.10.10.5',
port: 5000.
mode: 'tcp'
}
})
pino(transport)
```

All options are described further below.
Note that the `echo` option is disabled within this usage.

## Usage as Pino Legacy Transport

Pino supports a [legacy transport interface](https://getpino.io/#/docs/transports?id=legacy-transports)
that is still supported by this module.
Given an application `foo` that logs via [pino][pino], and a system that
collects logs on port UDP `5000` on IP `10.10.10.5`, you would use `pino-socket`
like so:
Expand Down
36 changes: 25 additions & 11 deletions lib/TcpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@ module.exports = function factory (userOptions) {
},
userOptions
)

// We use this passthrough to buffer incoming messages.
const inputStream = new stream.PassThrough()
process.stdin.pipe(inputStream)
inputStream.pause()

const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream')
? options.sourceStream : buildCliSourceStream()
let socket = null
let connected = false
let connecting = false
let socketError = null

// This stream is the one returned to psock.js.
const outputStream = stream.Writable({
autoDestroy: true,
close () { socket.end() },
write (data, encoding, callback) {
socket.write(data)
Expand All @@ -60,8 +57,11 @@ module.exports = function factory (userOptions) {
connecting = false
connected = true
if (cb) cb(null, connected)
inputStream.pipe(outputStream, { end: false })
inputStream.resume()

if (sourceStream) {
sourceStream.pipe(outputStream, { end: false })
sourceStream.resume()
}
}
)
addListeners()
Expand All @@ -70,8 +70,11 @@ module.exports = function factory (userOptions) {
function disconnect () {
connected = false
connecting = false
inputStream.pause()
inputStream.unpipe(outputStream)

if (sourceStream) {
sourceStream.pause()
sourceStream.unpipe(outputStream)
}
}

function reconnect () {
Expand Down Expand Up @@ -123,6 +126,17 @@ module.exports = function factory (userOptions) {
socket.removeAllListeners('error')
}

connect()
connect(() => {
// TODO we must propagate the events from the socket to the outputStream
outputStream.emit('open')
})
return outputStream
}

function buildCliSourceStream () {
// We use this passthrough to buffer incoming messages.
const inputStream = new stream.PassThrough()
process.stdin.pipe(inputStream)
inputStream.pause()
return inputStream
}
8 changes: 7 additions & 1 deletion lib/UdpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ module.exports = function factory (userOptions) {
}
})

process.stdin.pipe(writableStream, { end: false })
const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream')
? options.sourceStream : process.stdin

if (sourceStream) {
sourceStream.pipe(writableStream, { end: false })
}

return writableStream
}
36 changes: 36 additions & 0 deletions lib/pino-transport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict'

const tcpConnectionFactory = require('./TcpConnection')
const udpConnectionFactory = require('./UdpConnection')
const { once } = require('events')

const defaultOptions = {
unixsocket: '',
address: '127.0.0.1',
mode: 'udp',
port: '514',
echo: true,
reconnect: false,
reconnectTries: Infinity,
settings: null
}

async function socketTransport (opts) {
const options = Object.assign({
sourceStream: false
}, defaultOptions, opts)

let connection
if (options.mode === 'tcp') {
connection = tcpConnectionFactory(options)
} else {
connection = udpConnectionFactory(options)
}

await once(connection, 'open')

return connection
}

module.exports = socketTransport
module.exports.defaultOptions = defaultOptions
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"main": "psock.js",
"scripts": {
"test": "standard && mocha --ui qunit -R dot test",
"test:unit": "mocha --ui qunit test/transport.js",
"cov": "nyc mocha --ui qunit -R dot test"
},
"bin": {
Expand All @@ -26,7 +27,7 @@
"chai": "^4.2.0",
"mocha": "^9.0.0",
"nyc": "^15.1.0",
"pino": "^6.3.2",
"pino": "^7.0.0-rc.3",
"pre-commit": "^1.1.3",
"standard": "^14.3.4"
},
Expand Down
154 changes: 77 additions & 77 deletions psock.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,99 +3,99 @@
const path = require('path')
const tcpConnectionFactory = require(path.join(__dirname, 'lib', 'TcpConnection'))
const udpConnectionFactory = require(path.join(__dirname, 'lib', 'UdpConnection'))
const transport = require('./lib/pino-transport')
const split2 = require('split2')
const pump = require('pump')
const through2 = require('through2')
const nopt = require('nopt')
const fs = require('fs')

let options = {
unixsocket: '',
address: '127.0.0.1',
mode: 'udp',
port: '514',
echo: true,
reconnect: false,
reconnectTries: Infinity,
settings: null
}
const longOpts = {
unixsocket: String,
address: String,
mode: ['tcp', 'udp'],
port: Number,
reconnect: Boolean,
reconnectTries: Number,
echo: Boolean,
help: Boolean,
version: Boolean,
settings: String
}
const shortOpts = {
u: '--unixsocket',
a: '--address',
m: '--mode',
p: '--port',
r: '--reconnect',
t: '--reconnectTries',
e: '--echo',
ne: '--no-echo',
h: '--help',
v: '--version',
s: '--settings'
}
const argv = nopt(longOpts, shortOpts, process.argv)
options = Object.assign(options, argv)
module.exports = transport

if (options.help) {
console.log(fs.readFileSync(path.join(__dirname, 'help.txt'), 'utf8'))
process.exit(0)
if (require.main === module) {
// used as cli
cli()
}

if (options.version) {
console.log('pino-socket', require('./package.json').version)
process.exit(0)
}
function cli () {
const longOpts = {
unixsocket: String,
address: String,
mode: ['tcp', 'udp'],
port: Number,
reconnect: Boolean,
reconnectTries: Number,
echo: Boolean,
help: Boolean,
version: Boolean,
settings: String
}
const shortOpts = {
u: '--unixsocket',
a: '--address',
m: '--mode',
p: '--port',
r: '--reconnect',
t: '--reconnectTries',
e: '--echo',
ne: '--no-echo',
h: '--help',
v: '--version',
s: '--settings'
}
const argv = nopt(longOpts, shortOpts, process.argv)
let options = Object.assign({}, transport.defaultOptions, argv)

if (options.settings) {
try {
const loadedSettings = require(path.resolve(options.settings))
const settings = Object.assign(loadedSettings, argv)
options = Object.assign(options, settings)
} catch (e) {
console.error('`settings` parameter specified but could not load file: %s', e.message)
process.exit(1)
if (options.help) {
console.log(fs.readFileSync(path.join(__dirname, 'help.txt'), 'utf8'))
process.exit(0)
}
}

let connection
if (options.mode === 'tcp') {
connection = tcpConnectionFactory(options)
} else {
connection = udpConnectionFactory(options)
}
if (options.version) {
console.log('pino-socket', require('./package.json').version)
process.exit(0)
}

if (options.settings) {
try {
const loadedSettings = require(path.resolve(options.settings))
const settings = Object.assign(loadedSettings, argv)
options = Object.assign(options, settings)
} catch (e) {
console.error('`settings` parameter specified but could not load file: %s', e.message)
process.exit(1)
}
}

function shutdown () {
try {
connection.close()
} catch (e) {
let connection
if (options.mode === 'tcp') {
connection = tcpConnectionFactory(options)
} else {
connection = udpConnectionFactory(options)
}

function shutdown () {
try {
connection.close()
} catch (e) {
// I assume that due to the closing of the pipe, the dgram/tcp socket has
// some issues shutting down gracefully. Don't really care, though. So
// this try/catch is here to suppress the resulting error.
process.exit()
process.exit()
}
}
}
process.on('SIGINT', function sigint () {
shutdown()
})
process.on('SIGTERM', function sigterm () {
shutdown()
})
process.on('SIGINT', function sigint () {
shutdown()
})
process.on('SIGTERM', function sigterm () {
shutdown()
})

const myTransport = through2.obj(function transport (chunk, enc, cb) {
setImmediate(() => console.log(chunk))
cb()
})
const myTransport = through2.obj(function transport (chunk, enc, cb) {
setImmediate(() => console.log(chunk))
cb()
})

process.stdin.on('close', () => { shutdown() })
if (options.echo) pump(process.stdin, split2(), myTransport)
process.stdin.on('close', () => { shutdown() })
if (options.echo) pump(process.stdin, split2(), myTransport)
}
Loading