-
Notifications
You must be signed in to change notification settings - Fork 10
/
seq_string_transform.js
96 lines (87 loc) · 2.17 KB
/
seq_string_transform.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
'use strict';
let stream = require('stream');
let seq = require('seq-logging');
let LEVEL_NAMES = {
Verbose: 10,
Debug: 20,
Information: 30,
Warning: 40,
Error: 50,
Fatal: 60
};
/**
* Forwards ndjson objects as js-objects.
*
* Depending on config, other lines will be forwarded or dropped
*/
class SeqStringTransform extends stream.Transform {
constructor(config) {
super({
readableObjectMode: true,
writableObjectMode: false
});
this._logOtherAs = false;
let { logOtherAs } = config || {};
if (logOtherAs) {
if (LEVEL_NAMES[logOtherAs]) {
this._logOtherAs = LEVEL_NAMES[logOtherAs];
} else {
this.destroy(new Error(`${logOtherAs} is not a valid option for "logOtherAs"`));
}
}
this._bufferTime = false;
this._buffer = [];
}
_transform(chunk, encoding, callback) {
if (chunk) {
try {
const message = JSON.parse(chunk);
// Forward the buffered messages
if (this._logOtherAs) {
this.flushBuffer();
}
// Just forward the message
this.push(message);
} catch (err) {
if (this._logOtherAs) {
this.handleUnstructuredMessage(chunk);
}
}
}
callback(null);
}
handleUnstructuredMessage(message) {
this._bufferTime = this._bufferTime ? this._bufferTime : new Date();
this._buffer.push(message);
// Flush the message buffer after 1 sec of inacticity
if (!this._flushTimer) {
this._flushTimer = setTimeout(() => {
this.flushBuffer();
}, 1000);
}
}
flushBuffer() {
if (this._buffer.length) {
// No need to flush again
if (this._flushTimer) {
clearTimeout(this._flushTimer);
}
if (!this.destroyed) {
this.push({
time: this._bufferTime,
level: this._logOtherAs,
msg: this._buffer.join('\n')
});
}
this._bufferTime = false;
this._buffer = [];
}
}
// Force the underlying logger to flush at the time of the call
// and wait for pending writes to complete
_flush(callback) {
this.flushBuffer();
callback();
}
}
module.exports = SeqStringTransform;