Skip to content

Commit

Permalink
fix(send): Allow _serializeBuffer() to be async
Browse files Browse the repository at this point in the history
* lib/logger.js: use Promise.resolve().then() to call _serializeBuffer,
so it can optionally be async in subclasses.

* test/logger-log.js:
* test/logger-errors.js: Add tests for async _serializeBuffer, and
error handling thereof.
  • Loading branch information
emelski committed May 19, 2022
1 parent b020748 commit f00421a
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 154 deletions.
321 changes: 167 additions & 154 deletions lib/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -730,171 +730,184 @@ class Logger extends EventEmitter {
? buffer[buffer.length - 1].line
: null

const data = this._serializeBuffer(buffer)

this._getSendPayload(data, (error, payload) => {
if (error) {
const err = new Error('Error gzipping data')
err.meta = {
message: 'Will attempt to send data uncompressed'
, error
}
// We will still send, but without compression
/* eslint no-unused-vars:0 */
const {'Content-Encoding': _, ...headers} = config.headers
config.headers = headers
config.data = data
this.emit('error', err)
} else {
config.data = payload
}

axios.request(config)
.then((response) => {
// We have a 200-level success code.
let totalLinesSent = buffer.length

this[kIsLoggingBackedOff] = false
this[kAttempts] = 0
this[kIsSending] = false
this[kTotalLinesReady] -= buffer.length

if (response.status === PARTIAL_SUCCESS_CODE) {
const statusCodes = response.data.status || []
let goodLines = 0
for (let i = 0; i < statusCodes.length; i++) {
if (statusCodes[i] === SUCCESS_CODE) {
// This will effectively elide the failed lines from
// buffer, so if verboseEvents is enabled the 'send'
// event will include only the sent lines.

buffer[goodLines++] = buffer[i]
continue
}
totalLinesSent--
const err = new Error(LINE_INGEST_ERROR)
err.meta = {
statusCode: statusCodes[i]
, line: buffer[i].line
, ...(this[kVerboseEvents] && {buffer: [buffer[i]]})
}
this.emit('error', err)
Promise.resolve(this._serializeBuffer(buffer))
.then((data) => {
this._getSendPayload(data, (error, payload) => {
if (error) {
const err = new Error('Error gzipping data')
err.meta = {
message: 'Will attempt to send data uncompressed'
, error
}

// Truncate the buffer length to the count of good lines, dumping
// the (now) duplicates left at the end by the shifting we did
// above.

buffer.length = goodLines
}

// Remove the buffer from readyToSend.

this[kReadyToSend].shift()

if (!this[kVerboseEvents]) {
// Assist GC by killing the buffer, unless the user has
// requested that the buffer be included in the send event.

buffer.length = 0
}

this.emit('send', {
httpStatus: response.status
, firstLine
, lastLine
, totalLinesSent
, totalLinesReady: this[kTotalLinesReady]
, bufferCount: this[kReadyToSend].length
, ...(this[kVerboseEvents] && {buffer})
})

if (this[kReadyToSend].length) {
// Continue to send any backed up payloads that have accumulated
this.send()
return
// We will still send, but without compression
/* eslint no-unused-vars:0 */
const {'Content-Encoding': _, ...headers} = config.headers
config.headers = headers
config.data = data
this.emit('error', err)
} else {
config.data = payload
}

this.emit('cleared', {
message: ALL_CLEAR_SENT
})
})
.catch((error) => {
// Allow the microtask queue to unwind in case it's a Promise rejection
process.nextTick(() => {
const code = error.response
? error.response.status
: error.code // timeouts will populate this

++this[kAttempts]
const retrying = this._shouldRetry(code)
const {Authorization: _, ...headers} = config.headers
const errorMeta = {
actual: error.message
, code
, firstLine
, lastLine
, retrying
, attempts: this[kAttempts]
, headers
, url: config.url
, ...(this[kVerboseEvents] && {buffer})
}
axios.request(config)
.then((response) => {
// We have a 200-level success code.
let totalLinesSent = buffer.length

this[kIsLoggingBackedOff] = false
this[kAttempts] = 0
this[kIsSending] = false
this[kTotalLinesReady] -= buffer.length

if (response.status === PARTIAL_SUCCESS_CODE) {
const statusCodes = response.data.status || []
let goodLines = 0
for (let i = 0; i < statusCodes.length; i++) {
if (statusCodes[i] === SUCCESS_CODE) {
// This will effectively elide the failed lines from
// buffer, so if verboseEvents is enabled the 'send'
// event will include only the sent lines.

buffer[goodLines++] = buffer[i]
continue
}
totalLinesSent--
const err = new Error(LINE_INGEST_ERROR)
err.meta = {
statusCode: statusCodes[i]
, line: buffer[i].line
, ...(this[kVerboseEvents] && {buffer: [buffer[i]]})
}
this.emit('error', err)
}

// Truncate the buffer length to the count of good lines, dumping
// the (now) duplicates left at the end by the shifting we did
// above.

buffer.length = goodLines
}

if (retrying) {
this[kIsLoggingBackedOff] = true
this[kBackoffMs] = backoffWithJitter(
this.baseBackoffMs
, this.maxBackoffMs
, this[kBackoffMs]
)
setTimeout(() => {
this.send(false)
}, this[kBackoffMs])

if (this[kIgnoreRetryableErrors]) return

const err = new Error(
'Temporary connection-based error. It will be retried. '
+ 'See meta data for details.'
)
err.meta = errorMeta
this.emit('error', err)

return
}
// Remove the buffer from readyToSend.

const err = new Error(
'A connection-based error occurred that will not be retried. '
+ 'See meta data for details.'
)
err.meta = errorMeta
this.emit('error', err)

// User-level errors will be discarded since they will never succeed
this[kIsSending] = false
this[kTotalLinesReady] -= buffer.length
this[kAttempts] = 0
this[kReadyToSend].shift()
this[kReadyToSend].shift()

if (!this[kVerboseEvents]) {
// Assist GC by killing the buffer, unless the user has
// requested that the buffer be included in the error event.
if (!this[kVerboseEvents]) {
// Assist GC by killing the buffer, unless the user has
// requested that the buffer be included in the send event.

buffer.length = 0
}
buffer.length = 0
}

if (this[kReadyToSend].length) {
this.send()
return
}
this.emit('send', {
httpStatus: response.status
, firstLine
, lastLine
, totalLinesSent
, totalLinesReady: this[kTotalLinesReady]
, bufferCount: this[kReadyToSend].length
, ...(this[kVerboseEvents] && {buffer})
})

if (this[kReadyToSend].length) {
// Continue to send any backed up payloads that have accumulated
this.send()
return
}

this.emit('cleared', {
message: ALL_CLEAR_SENT
this.emit('cleared', {
message: ALL_CLEAR_SENT
})
})
.catch((error) => {
// Allow the microtask queue to unwind in case it's a Promise rejection
process.nextTick(() => {
const code = error.response
? error.response.status
: error.code // timeouts will populate this

++this[kAttempts]
const retrying = this._shouldRetry(code)
const {Authorization: _, ...headers} = config.headers
const errorMeta = {
actual: error.message
, code
, firstLine
, lastLine
, retrying
, attempts: this[kAttempts]
, headers
, url: config.url
, ...(this[kVerboseEvents] && {buffer})
}

if (retrying) {
this[kIsLoggingBackedOff] = true
this[kBackoffMs] = backoffWithJitter(
this.baseBackoffMs
, this.maxBackoffMs
, this[kBackoffMs]
)
setTimeout(() => {
this.send(false)
}, this[kBackoffMs])

if (this[kIgnoreRetryableErrors]) return

const err = new Error(
'Temporary connection-based error. It will be retried. '
+ 'See meta data for details.'
)
err.meta = errorMeta
this.emit('error', err)

return
}

const err = new Error(
'A connection-based error occurred that will not be retried. '
+ 'See meta data for details.'
)
err.meta = errorMeta
this.emit('error', err)

// User-level errors will be discarded since they will never succeed
this[kIsSending] = false
this[kTotalLinesReady] -= buffer.length
this[kAttempts] = 0
this[kReadyToSend].shift()

if (!this[kVerboseEvents]) {
// Assist GC by killing the buffer, unless the user has
// requested that the buffer be included in the error event.

buffer.length = 0
}

if (this[kReadyToSend].length) {
this.send()
return
}

this.emit('cleared', {
message: ALL_CLEAR_SENT
})
})
})
})
})
})
})
.catch((error) => {
const err = new Error('Error serializing buffer')
const errorMeta = {
actual: error.message
, firstLine
, lastLine
, url: config.url
, ...(this[kVerboseEvents] && {buffer})
}
err.meta = errorMeta
this.emit('error', err)
})
}
}

Expand Down
Loading

0 comments on commit f00421a

Please sign in to comment.