Skip to content

Commit cdeec86

Browse files
committed
[pinpoint-apm#285] Client Side gRPC Stream deadline retry refactoring
* The stream deadline configuration is loaded from pinpoint-config-default.json * Any value (null, undefined) should not terminate the agent because it needs to load JSON data and perform arithmetic * readable Stream highWaterMark = 100
1 parent 612f19d commit cdeec86

18 files changed

+531
-88
lines changed

lib/client/deadline-options-builder.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66

77
'use strict'
88

9-
// Java agent DEFAULT_CLIENT_REQUEST_TIMEOUT = 6000
10-
const defaultDeadlineSeconds = 6
119
class DeadlineOptionsBuilder {
12-
constructor(deadlineSeconds = defaultDeadlineSeconds) {
10+
constructor(deadlineSeconds) {
1311
this.deadlineSeconds = deadlineSeconds
1412
}
1513

lib/client/grpc-data-sender.js

+30-27
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ const cmdMessages = require('../data/v1/Cmd_pb')
2323
const wrappers = require('google-protobuf/google/protobuf/wrappers_pb')
2424
const activeRequestRepository = require('../metric/active-request-repository')
2525
const { setInterval } = require('node:timers/promises')
26-
const DeadlineOptionsBuilder = require('./deadline-options-builder')
2726
const { logError } = require('./grpc-errors')
27+
const StreamDeadlineOptionsBuilder = require('./stream-deadline-options-builder')
28+
const GrpcReadableStreamBuilder = require('./grpc-readable-stream-builder')
29+
const UnaryDeadlineOptionsBuilder = require('./unary-deadline-options-builder')
2830

2931
// AgentInfoSender.java
3032
// refresh daily
@@ -39,23 +41,22 @@ class GrpcDataSender {
3941
this.collectorStatPort = collectorStatPort
4042
this.collectorSpanPort = collectorSpanPort
4143

44+
this.unaryDeadlineOptionsBuilder = new UnaryDeadlineOptionsBuilder()
4245
this.initializeClients()
4346
this.initializeMetadataClients()
44-
this.initializeSpanStream(collectorIp, collectorSpanPort, config)
45-
this.initializeStatStream(collectorIp, collectorStatPort, config)
4647
this.initializePingStream()
4748
this.initializeAgentInfoScheduler()
4849
this.initializeProfilerClients(collectorIp, collectorTcpPort, config)
4950

50-
this.commandEchoDeadlineOptionsBuilder = new DeadlineOptionsBuilder()
51-
this.agentInfoOptionsBuilder = new DeadlineOptionsBuilder()
52-
this.metadataOptionsBuilder = new DeadlineOptionsBuilder()
51+
this.clientSideStreamDeadlineOptionsBuilder = new StreamDeadlineOptionsBuilder(config)
52+
this.initializeSpanStream(collectorIp, collectorSpanPort, config)
53+
this.initializeStatStream(collectorIp, collectorStatPort, config)
5354
}
5455

5556
close() {
5657
this.closeScheduler()
5758
if (this.spanStream) {
58-
this.spanStream.grpcStream.end()
59+
this.spanStream.end()
5960
}
6061
if (this.statStream) {
6162
this.statStream.grpcStream.end()
@@ -118,10 +119,12 @@ class GrpcDataSender {
118119
initializeSpanStream(collectorIp, collectorSpanPort, config) {
119120
this.spanClient = new services.SpanClient(collectorIp + ":" + collectorSpanPort, grpc.credentials.createInsecure(), { interceptors: [makeAgentInformationMetadataInterceptor(this.agentInfo)] })
120121

121-
this.spanStream = new GrpcClientSideStream('spanStream', this.spanClient, this.spanClient.sendSpan)
122-
if (config && config.streamDeadlineMinutesClientSide) {
123-
this.spanStream.setDeadlineMinutes(config.streamDeadlineMinutesClientSide)
124-
}
122+
// this.spanStream = new GrpcClientSideStream('spanStream', this.spanClient, this.spanClient.sendSpan)
123+
// if (config && config.streamDeadlineMinutesClientSide) {
124+
// this.spanStream.setDeadlineMinutes(config.streamDeadlineMinutesClientSide)
125+
// }
126+
this.spanStreamBuilder = new GrpcReadableStreamBuilder(this.spanClient, 'sendSpan')
127+
this.spanStreamBuilder.setDeadlineOptionsBuilder(this.clientSideStreamDeadlineOptionsBuilder)
125128
}
126129

127130
initializeStatStream(collectorIp, collectorStatPort, config) {
@@ -158,7 +161,7 @@ class GrpcDataSender {
158161

159162
sendAgentInfo(agentInfo, callback) {
160163
const pAgentInfo = dataConvertor.convertAgentInfo(agentInfo)
161-
let options = this.agentInfoOptionsBuilder.build()
164+
let options = this.unaryDeadlineOptionsBuilder.build()
162165
this.agentClient.requestAgentInfo(pAgentInfo, options, (err, response) => {
163166
logError('sendAgentInfo err: ', err)
164167
if (typeof callback === 'function') {
@@ -169,7 +172,7 @@ class GrpcDataSender {
169172
this.closeScheduler()
170173
if (this.agentInfoDailyScheduler) {
171174
this.removeJobForAgentInfo = this.agentInfoDailyScheduler.addJob(() => {
172-
options = this.agentInfoOptionsBuilder.build()
175+
options = this.unaryDeadlineOptionsBuilder.build()
173176
this.agentClient.requestAgentInfo(pAgentInfo, options, (err, response) => {
174177
logError('sendAgentInfo err: ', err)
175178
if (typeof callback === 'function') {
@@ -183,7 +186,7 @@ class GrpcDataSender {
183186

184187
sendApiMetaInfo(apiMetaInfo, callback) {
185188
const pApiMetaData = dataConvertor.convertApiMetaInfo(apiMetaInfo)
186-
const options = this.metadataOptionsBuilder.build()
189+
const options = this.unaryDeadlineOptionsBuilder.build()
187190
this.metadataClient.requestApiMetaData(pApiMetaData, options, (err, response) => {
188191
logError(err)
189192
if (callback) {
@@ -194,7 +197,7 @@ class GrpcDataSender {
194197

195198
sendStringMetaInfo(stringMetaInfo, callback) {
196199
const pStringMetaData = dataConvertor.convertStringMetaInfo(stringMetaInfo)
197-
const options = this.metadataOptionsBuilder.build()
200+
const options = this.unaryDeadlineOptionsBuilder.build()
198201
this.metadataClient.requestStringMetaData(pStringMetaData, options, (err, response) => {
199202
logError(err)
200203
if (callback) {
@@ -205,7 +208,7 @@ class GrpcDataSender {
205208

206209
sendSqlMetaInfo(sqlMetaData, callback) {
207210
const pSqlMetaData = sqlMetaData.valueOfProtocolBuffer()
208-
const options = this.metadataOptionsBuilder.build()
211+
const options = this.unaryDeadlineOptionsBuilder.build()
209212
this.metadataClient.requestSqlMetaData(pSqlMetaData, options, (err, response) => {
210213
logError(err)
211214
if (callback) {
@@ -216,7 +219,7 @@ class GrpcDataSender {
216219

217220
sendSqlUidMetaData(sqlMetaData, callback) {
218221
const pSqlMetaData = sqlMetaData.valueOfProtocolBuffer()
219-
const options = this.metadataOptionsBuilder.build()
222+
const options = this.unaryDeadlineOptionsBuilder.build()
220223
this.metadataClient.requestSqlUidMetaData(pSqlMetaData, options, (err, response) => {
221224
logError(err)
222225
if (callback) {
@@ -228,24 +231,24 @@ class GrpcDataSender {
228231
sendSpan(span) {
229232
try {
230233
const pSpan = span.toProtocolBuffer()
231-
if (log.isDebug()) {
232-
log.debug(`sendSpan pSpan: ${pSpan}`)
234+
235+
if (!this.spanStream) {
236+
this.spanStream = this.spanStreamBuilder.build()
233237
}
234-
this.spanStream.write(pSpan)
238+
this.spanStream.push(pSpan)
235239
} catch (e) {
236-
if (e && e.stack) {
237-
log.error(`sendSpan(span) Error: ${e.stack}`)
238-
}
240+
logError('sendSpan(span) Error: ', e)
239241
}
240242
}
241243

242244
sendSpanChunk(spanChunk) {
243245
try {
244246
const pSpanChunk = spanChunk.toProtocolBuffer()
245-
if (log.isDebug()) {
246-
log.debug(`sendSpanChunk spanChunk: `, spanChunk)
247+
248+
if (!this.spanStream) {
249+
this.spanStream = this.spanStreamBuilder.build()
247250
}
248-
this.spanStream.write(pSpanChunk)
251+
this.spanStream.push(pSpanChunk)
249252
} catch (e) {
250253
if (e && e.stack) {
251254
log.error(`sendSpanChunk(spanChunk) Error: ${e.stack}`)
@@ -349,7 +352,7 @@ class GrpcDataSender {
349352
}
350353

351354
sendCommandEcho(commandEchoResponse, callback) {
352-
let options = this.commandEchoDeadlineOptionsBuilder.build()
355+
let options = this.unaryDeadlineOptionsBuilder.build()
353356
this.profilerClient.commandEcho(commandEchoResponse, options, (err, response) => {
354357
if (err) {
355358
log.error(err)

lib/client/grpc-errors.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ function logError(message, error) {
1515
}
1616

1717
if (isCancelledError(error)) {
18-
log.error('Pinpoint Collector has been shut down')
18+
log.error(`Pinpoint Collector has been shut down : ${message}`)
1919
} else if (isUnavailableError(error)) {
20-
log.error('Pinpoint Collector is gRPC connection unavailable')
20+
log.error(`Pinpoint Collector is gRPC connection unavailable : ${message}`)
2121
} else if (isDeadlineExceededError(error)) {
22-
log.error('gRPC Stream deadline exceeded')
22+
log.error(`gRPC Stream deadline exceeded : ${message}`)
2323
} else {
2424
log.error(message, error)
2525
}
+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/**
2+
* Pinpoint Node.js Agent
3+
* Copyright 2021-present NAVER Corp.
4+
* Apache License v2.0
5+
*/
6+
7+
'use strict'
8+
9+
const { Transform } = require('node:stream')
10+
const { logError } = require('./grpc-errors')
11+
const GrpcWritableStreamBuilder = require('./grpc-writable-stream-builder')
12+
13+
// Java Agent queue size private static final int DEFAULT_AGENT_SENDER_EXECUTOR_QUEUE_SIZE = 1000;
14+
class GrpcReadableStream {
15+
constructor(makeWritableStream) {
16+
this.writableStreamBuilder = new GrpcWritableStreamBuilder(makeWritableStream)
17+
this.makeWritableStream = makeWritableStream
18+
this.maxAttempts = 3
19+
this.initialBackoff = 1000
20+
this.readableStream = this.makeReadableStream()
21+
this.pipeWritableStream()
22+
}
23+
24+
makeReadableStream() {
25+
const readableStream = new Transform(Object.assign({
26+
readableObjectMode: true,
27+
highWaterMark: 100,
28+
transform(chunk, encoding, callback) {
29+
callback(null, chunk)
30+
},
31+
// https://nodejs.org/api/stream.html#readablepushchunk-encoding
32+
read: () => {
33+
this.readStart()
34+
}
35+
}, this.options ?? {}))
36+
37+
readableStream.on('error', () => {
38+
// https://nodejs.org/api/stream.html#readablepipedestination-options
39+
// `One important caveat is that if the Readable stream emits an error during processing,
40+
// the Writable destination is not closed automatically.
41+
// If an error occurs, it will be necessary to manually close each stream
42+
// in order to prevent memory leaks.`
43+
// for readable steam error memory leak prevention
44+
if (this.writableStream && typeof this.writableStream.end === 'function') {
45+
this.writableStream.end()
46+
}
47+
})
48+
49+
return readableStream
50+
}
51+
52+
readStart() {
53+
this.readable = true
54+
}
55+
56+
push(data) {
57+
if (this.readable === false) {
58+
return
59+
}
60+
61+
if (!this.readableStream.push(data)) {
62+
this.readStop()
63+
}
64+
}
65+
66+
readStop() {
67+
this.readable = false
68+
}
69+
70+
pipeWritableStream(retryCount = 0) {
71+
if (this.ended) {
72+
return
73+
}
74+
75+
this.writableStreamBuilder.setDeadlineOptionsBuilder(this.deadlineOptionsBuilder)
76+
.setCallCallback((error, response) => {
77+
if (typeof this.callback === 'function') {
78+
this.callback(error, response)
79+
}
80+
})
81+
.build((writableStream) => {
82+
this.readableStream.pipe(writableStream)
83+
this.writableStream = writableStream
84+
})
85+
}
86+
87+
lazyPipeWritableStream(callCount = 1) {
88+
if (this.ended) {
89+
return
90+
}
91+
92+
if (callCount > this.maxAttempts) {
93+
return
94+
}
95+
96+
setTimeout(() => {
97+
this.pipeWritableStream(callCount)
98+
}, this.initialBackoff * callCount)
99+
}
100+
101+
end() {
102+
this.ended = true
103+
// writableStream.end() is called by function onend() { dest.end() } in Readable.prototype.pipe()
104+
this.readableStream.end()
105+
}
106+
}
107+
108+
class GrpcReadableStreamBuilder {
109+
constructor(serviceClient, method) {
110+
this.serviceClient = serviceClient
111+
this.method = method
112+
}
113+
114+
setMaxAttempts(maxAttempts) {
115+
this.maxAttempts = maxAttempts
116+
return this
117+
}
118+
119+
setInitialBackoff(initialBackoff) {
120+
this.initialBackoff = initialBackoff
121+
return
122+
}
123+
124+
setDeadlineOptionsBuilder(deadlineOptionsBuilder) {
125+
this.deadlineOptionsBuilder = deadlineOptionsBuilder
126+
return this
127+
}
128+
129+
setCallback(callback) {
130+
this.callback = callback
131+
return this
132+
}
133+
134+
setErrorHandler(errorHandler) {
135+
this.errorHandler = errorHandler
136+
return this
137+
}
138+
139+
setReadableStreamOptions(options) {
140+
this.options = options
141+
return this
142+
}
143+
144+
build() {
145+
const serviceClientCall = this.serviceClient[this.method].bind(this.serviceClient)
146+
const stream = new GrpcReadableStream(serviceClientCall)
147+
148+
if (this.maxAttempts) {
149+
stream.maxAttempts = this.maxAttempts
150+
}
151+
152+
if (this.initialBackoff) {
153+
stream.initialBackoff = this.initialBackoff
154+
}
155+
156+
stream.deadlineOptionsBuilder = this.deadlineOptionsBuilder
157+
stream.errorHandler = this.errorHandler
158+
159+
if (this.callback) {
160+
stream.callback = this.callback
161+
}
162+
163+
stream.options = this.options
164+
return stream
165+
}
166+
}
167+
168+
module.exports = GrpcReadableStreamBuilder

0 commit comments

Comments
 (0)