Skip to content

Commit

Permalink
Merge pull request #261 from sematext/sc-9545-fix-stream-err
Browse files Browse the repository at this point in the history
Fix #256 Write after stream end
  • Loading branch information
Adnan Rahić authored Oct 13, 2020
2 parents 14ba81e + ad6dcce commit 99b87f5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 32 deletions.
14 changes: 14 additions & 0 deletions config/examples/output-files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,17 @@ output:
maxFiles: 2
interval: 1d
compress: true

# config format example:
# output:
# files:
# module: file-output
# format: template # or ldjson, pretty, yaml
# template: '{fieldA} {fieldB}'
# sourceName: .*
# filename: /tmp/${fieldName}.txt
# size: 10M
# maxFiles: 2
# interval: 1d
# compress: true

60 changes: 28 additions & 32 deletions lib/plugins/output/files.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
'use strict'
var format = require('string-template')
var prettyjson = require('prettyjson')
var safeStringify = require('fast-safe-stringify')
var rfs = require('rotating-file-stream')
const format = require('string-template')
const prettyjson = require('prettyjson')
const safeStringify = require('fast-safe-stringify')
const rfs = require('rotating-file-stream')

function OutputFile (config, eventEmitter) {
/**
config format example:
output:
files:
module: file-output
format: template # or ldjson, pretty, yaml
template: '{fieldA} {fieldB}'
sourceName: .*
filename: /tmp/${fieldName}.txt
size: 10M
maxFiles: 2
interval: 1d
compress: true
*/
this.streams = {}
this.config = config
this.eventEmitter = eventEmitter
Expand All @@ -36,14 +22,22 @@ function OutputFile (config, eventEmitter) {
if (config.format === 'template') {
this.config.formatTemplate = true
}
if (config.compress === false) {
this.config.compress = false
} else {
this.config.compress = true
}
}

OutputFile.prototype.createStream = function (file) {
var stream = rfs(file, {
const stream = rfs(file, {
size: this.config.size || '10M', // rotate every 10 MegaBytes written
interval: this.config.interval || '1d', // rotate daily
compress: this.config.compress || true // compress rotated files
compress: this.config.compress // compress rotated files
})
stream.on('error', console.log)
stream.on('warning', console.log)

return stream
}

Expand All @@ -52,23 +46,25 @@ OutputFile.prototype.eventHandler = function (data, context) {
this.sourceNameFilter.test(context.sourceName) &&
this.typeFilter.test(data._type)
) {
var fileName = format(this.config.fileName, data)
const fileName = format(this.config.fileName, data)
if (!this.streams[fileName]) {
this.streams[fileName] = this.createStream(fileName)
}
var stream = this.streams[fileName]
const stream = this.streams[fileName]

if (this.config.pretty) {
stream.write(JSON.stringify(data, null, '\t'))
} else if (this.config.yaml) {
stream.write(prettyjson.render(data, { noColor: false }) + '\n')
} else if (this.config.formatTemplate) {
stream.write(format(this.config.template, data) + '\n')
} else {
stream.write(safeStringify(data) + '\n')
return stream.write(JSON.stringify(data, null, '\t'))
}
}
if (this.config.suppress) {
return

if (this.config.yaml) {
return stream.write(prettyjson.render(data, { noColor: false }) + '\n')
}

if (this.config.formatTemplate) {
return stream.write(format(this.config.template, data) + '\n')
}

return stream.write(safeStringify(data) + '\n')
}
}

Expand Down

0 comments on commit 99b87f5

Please sign in to comment.