diff --git a/package.json b/package.json index 1e821aa..5348130 100644 --- a/package.json +++ b/package.json @@ -50,7 +50,7 @@ "mime-types": "^2.1.35", "qs": "^6.12.1", "type-fest": "^4.20.1", - "undici": "^7.0.0", + "undici": "^7.1.1", "ylru": "^2.0.0" }, "devDependencies": { diff --git a/src/HttpClient.ts b/src/HttpClient.ts index a80861d..e112323 100644 --- a/src/HttpClient.ts +++ b/src/HttpClient.ts @@ -681,8 +681,8 @@ export class HttpClient extends EventEmitter { res, }; - debug('Request#%d got response, status: %s, headers: %j, timing: %j', - requestId, res.status, res.headers, res.timing); + debug('Request#%d got response, status: %s, headers: %j, timing: %j, socket: %j', + requestId, res.status, res.headers, res.timing, res.socket); if (args.retry > 0 && requestContext.retries < args.retry) { const isRetry = args.isRetry ?? defaultIsRetry; diff --git a/src/utils.ts b/src/utils.ts index 3181232..91da8c9 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -173,13 +173,13 @@ export function updateSocketInfo(socketInfo: SocketInfo, internalOpaque: any, er socketInfo.remotePort = socket.remotePort; socketInfo.remoteFamily = socket.remoteFamily; } + if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) { + socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses; + } socketInfo.bytesRead = socket.bytesRead; socketInfo.bytesWritten = socket.bytesWritten; if (socket[symbols.kSocketConnectErrorTime]) { socketInfo.connectErrorTime = socket[symbols.kSocketConnectErrorTime]; - if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) { - socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses; - } socketInfo.connectProtocol = socket[symbols.kSocketConnectProtocol]; socketInfo.connectHost = socket[symbols.kSocketConnectHost]; socketInfo.connectPort = socket[symbols.kSocketConnectPort]; diff --git a/test/diagnostics_channel.test.ts b/test/diagnostics_channel.test.ts index d04ec46..fa00569 100644 --- a/test/diagnostics_channel.test.ts +++ b/test/diagnostics_channel.test.ts @@ -1,8 +1,11 @@ import { strict as assert } from 'node:assert'; import diagnosticsChannel from 'node:diagnostics_channel'; import { setTimeout as sleep } from 'node:timers/promises'; +import { createSecureServer } from 'node:http2'; +import { once } from 'node:events'; import { describe, it, beforeEach, afterEach } from 'vitest'; -import urllib from '../src/index.js'; +import selfsigned from 'selfsigned'; +import urllib, { HttpClient } from '../src/index.js'; import type { RequestDiagnosticsMessage, ResponseDiagnosticsMessage, @@ -138,6 +141,144 @@ describe('diagnostics_channel.test.ts', () => { diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage); }); + it('should support trace socket info with H2 by undici:client:sendHeaders and undici:request:trailers', async () => { + const pem = selfsigned.generate(); + const server = createSecureServer({ + key: pem.private, + cert: pem.cert, + }); + server.on('stream', (stream, headers) => { + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200, + }); + if (headers[':method'] !== 'HEAD') { + stream.end('hello h2!'); + } + }); + + server.listen(0); + await once(server, 'listening'); + + const kRequests = Symbol('requests'); + let lastRequestOpaque: any; + let kHandler: any; + function onMessage(message: any, name: string | symbol) { + if (name === 'undici:client:connected') { + // console.log('%s %j', name, message.connectParams); + message.socket[kRequests] = 0; + return; + } + const { request, socket } = message; + if (!kHandler) { + const symbols = Object.getOwnPropertySymbols(request); + for (const symbol of symbols) { + if (symbol.description === 'handler') { + kHandler = symbol; + break; + } + } + } + const handler = request[kHandler]; + let opaque = handler.opaque || handler.opts?.opaque; + assert(opaque); + opaque = opaque[symbols.kRequestOriginalOpaque]; + if (opaque && name === 'undici:client:sendHeaders' && socket) { + socket[kRequests]++; + opaque.tracer.socket = { + localAddress: socket.localAddress, + localPort: socket.localPort, + remoteAddress: socket.remoteAddress, + remotePort: socket.remotePort, + remoteFamily: socket.remoteFamily, + timeout: socket.timeout, + bytesWritten: socket.bytesWritten, + bytesRead: socket.bytesRead, + requests: socket[kRequests], + }; + } + // console.log('%s emit, %s %s, opaque: %j', name, request.method, request.origin, opaque); + lastRequestOpaque = opaque; + // console.log(request); + } + diagnosticsChannel.subscribe('undici:client:connected', onMessage); + diagnosticsChannel.subscribe('undici:client:sendHeaders', onMessage); + diagnosticsChannel.subscribe('undici:request:trailers', onMessage); + + const httpClient = new HttpClient({ + allowH2: true, + connect: { + rejectUnauthorized: false, + }, + }); + + let traceId = `mock-traceid-${Date.now()}`; + _url = `https://localhost:${server.address().port}`; + let response = await httpClient.request(`${_url}?head=true`, { + method: 'HEAD', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert(response.url.startsWith(_url)); + assert(!response.redirected); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 1); + + // HEAD, GET 请求都走同一个 http2 session socket + await sleep(1); + traceId = `mock-traceid-${Date.now()}`; + response = await httpClient.request(_url, { + method: 'GET', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 2); + + await sleep(1); + traceId = `mock-traceid-${Date.now()}`; + response = await httpClient.request(_url, { + method: 'GET', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 3); + + // socket 复用 1000 次 + let count = 1000; + while (count-- > 0) { + await sleep(1); + traceId = `mock-traceid-${Date.now()}`; + response = await httpClient.request(`${_url}?count=${count}`, { + method: 'GET', + opaque: { + tracer: { traceId }, + }, + }); + assert.equal(response.status, 200); + assert.equal(lastRequestOpaque.tracer.traceId, traceId); + assert(lastRequestOpaque.tracer.socket); + assert.equal(lastRequestOpaque.tracer.socket.requests, 3 + 1000 - count); + } + assert.equal(lastRequestOpaque.tracer.socket.requests, 1003); + + diagnosticsChannel.unsubscribe('undici:client:connected', onMessage); + diagnosticsChannel.unsubscribe('undici:client:sendHeaders', onMessage); + diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage); + server.close(); + }); + it('should support trace request by urllib:request and urllib:response', async () => { let lastRequestOpaque: any; let socket: any;