Skip to content

Commit f6c8058

Browse files
committed
[pinpoint-apm#285] Client Side gRPC Stream deadline retry refactoring
* The stream deadline configuration is loaded from pinpoint-config-default.json * Any value (null, undefined) should not terminate the agent because it needs to load JSON data and perform arithmetic
1 parent 8702a66 commit f6c8058

5 files changed

+19
-40
lines changed

lib/client/grpc-readable-stream-builder.js

+4
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ class GrpcReadableStream {
7474
return
7575
}
7676

77+
if (this.ended) {
78+
return
79+
}
80+
7781
try {
7882
const options = this.deadlineOptionsBuilder?.build() ?? {}
7983
const writableStream = this.makeWritableStream(options, (error, response) => {

test/client/active-request-grpc-readable-stream.test.js

+5-15
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@ const GrpcReadableStream = require('../../lib/client/grpc-readable-stream')
1818
const semver = require('semver')
1919

2020
test('If you run the ActiveRequest function, and then Node agent send the fifth piece of data, and the Pinpoint server fails, the ActiveRequest gRPC Stream closes and the for statement stops', (t) => {
21-
if (semver.satisfies(process.versions.node, '<19.0')) {
22-
t.plan(56)
23-
} else {
24-
t.plan(55)
25-
}
21+
t.plan(56)
2622
const server = new grpc.Server()
2723
let requestId = 1
2824
let activeRequestCount = 0
@@ -180,8 +176,7 @@ test('If you run the ActiveRequest function, and then Node agent send the fifth
180176
dataSender.close()
181177
handleCommandCall.end()
182178
shimmer.unwrap(GrpcReadableStream.prototype, 'pipeWritableStream')
183-
server2.tryShutdown(() => {
184-
})
179+
server2.forceShutdown()
185180
})
186181
}
187182
})
@@ -235,11 +230,7 @@ test('If you run the ActiveRequest function, and deadline occurs, the ActiveRequ
235230
const commandStream1st = this
236231
t.equal(commandStream1st.name, '', '1st created stream is commandStream creating writable stream')
237232
commandStream1st.writableStream.on('end', () => {
238-
if (semver.satisfies(process.versions.node, '<19.0')) {
239-
t.true(firedError, 'The first commandStream is ended by Functional Test is Ended')
240-
} else {
241-
t.false(firedError, 'The first commandStream is ended by deadline')
242-
}
233+
t.true(firedError, 'The first commandStream is ended by Functional Test is Ended')
243234
})
244235
let firedError
245236
commandStream1st.writableStream.on('error', (error) => {
@@ -296,8 +287,7 @@ test('If you run the ActiveRequest function, and deadline occurs, the ActiveRequ
296287
t.teardown(() => {
297288
dataSender.close()
298289
handleCommandCall.end()
299-
server.tryShutdown(() => {
300-
shimmer.unwrap(GrpcReadableStream.prototype, 'pipeWritableStream')
301-
})
290+
shimmer.unwrap(GrpcReadableStream.prototype, 'pipeWritableStream')
291+
server.forceShutdown()
302292
})
303293
})

test/client/grpc-data-sender.test.js

+8-22
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const test = require('tape')
88
const AsyncId = require('../../lib/context/async-id')
99
const grpc = require('@grpc/grpc-js')
1010
const services = require('../../lib/data/v1/Service_grpc_pb')
11-
const { beforeSpecificOne, afterOne, getCallRequests, getMetadata, DataSourceCallCountable } = require('./grpc-fixture')
11+
const { beforeSpecificOne, afterOne, getCallRequests, getMetadata, DataSourceCallCountable, SpanOnlyFunctionalTestableDataSource } = require('./grpc-fixture')
1212
const cmdMessage = require('../../lib/data/v1/Cmd_pb')
1313
const CommandType = require('../../lib/client/command/command-type')
1414
const { Empty } = require('google-protobuf/google/protobuf/empty_pb')
@@ -56,19 +56,6 @@ function sendSpan(call) {
5656
callMetadata.push(call.metadata)
5757
}
5858

59-
class DataSource extends DataSourceCallCountable {
60-
constructor(collectorIp, collectorTcpPort, collectorStatPort, collectorSpanPort, agentInfo, config) {
61-
super(collectorIp, collectorTcpPort, collectorStatPort, collectorSpanPort, agentInfo, config)
62-
}
63-
64-
initializeClients() { }
65-
initializeMetadataClients() { }
66-
initializeStatStream() { }
67-
initializePingStream() { }
68-
initializeAgentInfoScheduler() { }
69-
initializeProfilerClients() { }
70-
}
71-
7259
test('Should send span', function (t) {
7360
agent.bindHttp()
7461
sendSpanMethodOnDataCallback = null
@@ -79,7 +66,7 @@ test('Should send span', function (t) {
7966
})
8067
let dataSender
8168
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
82-
const grpcDataSender = beforeSpecificOne(port, DataSource)
69+
const grpcDataSender = beforeSpecificOne(port, SpanOnlyFunctionalTestableDataSource)
8370
const traceRoot = new RemoteTraceRootBuilder(agent.agentInfo, '5').build()
8471
dataSender = dataSenderMock(agent.config, grpcDataSender)
8572
const spanBuilder = new SpanBuilder(traceRoot)
@@ -175,7 +162,7 @@ test('sendSpanChunk redis.SET.end', function (t) {
175162

176163
let dataSender
177164
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
178-
const grpcDataSender = beforeSpecificOne(port, DataSource)
165+
const grpcDataSender = beforeSpecificOne(port, SpanOnlyFunctionalTestableDataSource)
179166
const traceRoot = new RemoteTraceRootBuilder(agent.agentInfo, '5').build()
180167
const asyncId = AsyncId.make()
181168
dataSender = dataSenderMock(agent.config, grpcDataSender)
@@ -253,7 +240,7 @@ test('sendSpanChunk redis.GET.end', (t) => {
253240

254241
let dataSender
255242
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
256-
const grpcDataSender = beforeSpecificOne(port, DataSource)
243+
const grpcDataSender = beforeSpecificOne(port, SpanOnlyFunctionalTestableDataSource)
257244
const traceRoot = new RemoteTraceRootBuilder(agent.agentInfo, '5').build()
258245
const asyncId = AsyncId.make()
259246
dataSender = dataSenderMock(agent.config, grpcDataSender)
@@ -324,7 +311,7 @@ test('sendSpan', (t) => {
324311

325312
let dataSender
326313
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
327-
const grpcDataSender = beforeSpecificOne(port, DataSource)
314+
const grpcDataSender = beforeSpecificOne(port, SpanOnlyFunctionalTestableDataSource)
328315
const traceRoot = new RemoteTraceRootBuilder(agent.agentInfo, '5').build()
329316
dataSender = dataSenderMock(agent.config, grpcDataSender)
330317
const spanBuilder = new SpanBuilder(traceRoot)
@@ -455,10 +442,9 @@ test('sendStat', (t) => {
455442
const actualHistogram = data.getAgentstat().getActivetrace().getHistogram().getActivetracecountList()
456443
t.deepEqual(originalHistogram, actualHistogram, 'active trace histogram')
457444
agent.dataSender.close()
458-
collectorServer.tryShutdown(() => {
459-
server.close(() => {
460-
t.end()
461-
})
445+
collectorServer.forceShutdown()
446+
server.close(() => {
447+
t.end()
462448
})
463449
}
464450
})

test/client/grpc-fixture.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ function beforeSpecificOne(port, one, serviceConfig) {
2929
actualConfig.collectorStatPort = port
3030
actualConfig.collectorSpanPort = port
3131
actualConfig.enabledDataSending = true
32-
return new one(
32+
const dataSource = new one(
3333
actualConfig.collectorIp,
3434
actualConfig.collectorTcpPort,
3535
actualConfig.collectorStatPort,
3636
actualConfig.collectorSpanPort,
3737
agentInfo(),
3838
actualConfig
3939
)
40+
return dataSource
4041
}
4142

4243
function agentInfo() {

test/client/grpc-readable-stream.test.js

-2
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,6 @@ test('When gRPC server shutdown and then node agent grpcStream on error fired an
341341
if (loadCount === 2) {
342342
dataSender.close()
343343
server.forceShutdown()
344-
server.tryShutdown(() => {
345-
})
346344
t.end()
347345
}
348346
}

0 commit comments

Comments
 (0)