Skip to content

Commit

Permalink
feat: ensure support for streams over H2
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 committed May 10, 2023
1 parent 51f63ef commit 7661b05
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 12 deletions.
31 changes: 28 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
const assert = require('assert')
const net = require('net')
const http2 = require('http2')
const { pipeline } = require('stream')
const util = require('./core/util')
const timers = require('./timers')
const Request = require('./core/request')
Expand Down Expand Up @@ -1656,6 +1657,8 @@ function writeH2 (client, session, request) {
headers['content-length'] = '0'
} else {
assert(contentLength == null, 'no body must not have content length')
// Indicate no more data will be written
stream.end()
}

request.onRequestSent()
Expand All @@ -1664,8 +1667,7 @@ function writeH2 (client, session, request) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

headers['content-length'] = `${contentLength}`
console.log('stream:', stream.write)
stream.write(body)
stream.end(body)
request.onBodySent(body)
request.onRequestSent()
console.log('ended?')
Expand All @@ -1677,7 +1679,30 @@ function writeH2 (client, session, request) {
writeBlob({ body, client, request, socket: client[kSocket], contentLength, header, expectsPayload })
}
} else if (util.isStream(body)) {
writeStream({ client, request, socket: client[kSocket], contentLength, expectsPayload })
// For HTTP/2, is enough to pipe the stream
// TODO: adapt the HTTP2Stream to API compatible with current undici
// expectations from a stream
// writeStream({ body, client, request, socket: client[kSocket], contentLength, expectsPayload })
const pipe = pipeline(
body,
stream,
(err) => {
if (err) {
util.destroy(body, err)
}
}
)
pipe.on('data', onPipeData)
pipe.once('end', () => {
pipe.removeListener('data', onPipeData)
util.destroy(pipe)
request.onRequestSent()
})
body.once('end', () => util.destroy(body))

function onPipeData (chunk) {
request.onBodySent(chunk)
}
} else if (util.isIterable(body)) {
writeIterable({ client, request, socket: client[kSocket], contentLength, expectsPayload })
} else {
Expand Down
117 changes: 108 additions & 9 deletions test/http2.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
'use strict'

const { createSecureServer } = require('node:http2')
const { createReadStream, readFileSync } = require('node:fs')
const { once } = require('node:events')

const { test, plan } = require('tap')
const pem = require('https-pem')

const { Client } = require('..')

plan(2)
plan(3)

test('Should support H2 connection', async t => {
const body = []
Expand Down Expand Up @@ -59,8 +60,11 @@ test('Should support H2 connection', async t => {

test('Should handle h2 request with body (string or buffer)', async t => {
const server = createSecureServer(pem)
const responseBody = []
const requestBody = []
const responseBody1 = []
const responseBody2 = []
const requestBodyString = []
const requestBodyBuffer = []
let reqCounter = 0

// server.on('checkContinue', (request, response) => {
// t.equal(request.headers['x-my-header'], 'foo')
Expand All @@ -72,18 +76,23 @@ test('Should handle h2 request with body (string or buffer)', async t => {
// })

server.on('stream', async (stream, headers) => {
stream.on('data', chunk => requestBody.push(chunk))
reqCounter++
if (reqCounter === 1) {
stream.on('data', chunk => requestBodyString.push(chunk))
} else {
stream.on('data', chunk => requestBodyBuffer.push(chunk))
}

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': 'hello',
'x-custom-h2': headers['x-my-header'],
':status': 200
})

stream.end('hello h2!')
})

t.plan(5)
t.plan(10)

server.listen(0)
await once(server, 'listening')
Expand All @@ -97,7 +106,7 @@ test('Should handle h2 request with body (string or buffer)', async t => {
t.teardown(server.close.bind(server))
t.teardown(client.close.bind(client))

const response = await client.request({
const response1 = await client.request({
path: '/',
method: 'POST',
headers: {
Expand All @@ -107,14 +116,104 @@ test('Should handle h2 request with body (string or buffer)', async t => {
// expectContinue: true
})

response1.body.on('data', chunk => {
responseBody1.push(chunk)
})

await once(response1.body, 'end')

const response2 = await client.request({
path: '/',
method: 'POST',
headers: {
'x-my-header': 'foo'
},
body: Buffer.from('hello from client!', 'utf-8')
// expectContinue: true
})

response2.body.on('data', chunk => {
responseBody2.push(chunk)
})

await once(response2.body, 'end')

t.equal(response1.statusCode, 200)
t.equal(response1.headers['content-type'], 'text/plain; charset=utf-8')
t.equal(response1.headers['x-custom-h2'], 'foo')
t.equal(Buffer.concat(responseBody1).toString('utf-8'), 'hello h2!')
t.equal(
Buffer.concat(requestBodyString).toString('utf-8'),
'hello from client!'
)

t.equal(response2.statusCode, 200)
t.equal(response2.headers['content-type'], 'text/plain; charset=utf-8')
t.equal(response2.headers['x-custom-h2'], 'foo')
t.equal(Buffer.concat(responseBody2).toString('utf-8'), 'hello h2!')
t.equal(
Buffer.concat(requestBodyBuffer).toString('utf-8'),
'hello from client!'
)
})

test('Should handle h2 request with body (stream)', async t => {
const server = createSecureServer(pem)
const expectedBody = readFileSync(__filename, 'utf-8')
const stream = createReadStream(__filename)
const requestChunks = []
const responseBody = []

server.on('stream', async (stream, headers) => {
t.equal(headers[':method'], 'PUT')
t.equal(headers[':path'], '/')
t.equal(headers[':scheme'], 'https')

console.log('request received and valid')

stream.on('data', chunk => requestChunks.push(chunk))

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': headers['x-my-header'],
':status': 200
})

stream.end('hello h2!')
})

t.plan(8)

server.listen(0)
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
}
})

t.teardown(server.close.bind(server))
t.teardown(client.close.bind(client))

const response = await client.request({
path: '/',
method: 'PUT',
headers: {
'x-my-header': 'foo'
},
body: stream
})

response.body.on('data', chunk => {
responseBody.push(chunk)
})

await once(response.body, 'end')

t.equal(response.statusCode, 200)
t.equal(response.headers['content-type'], 'text/plain; charset=utf-8')
t.equal(response.headers['x-custom-h2'], 'hello')
t.equal(response.headers['x-custom-h2'], 'foo')
t.equal(Buffer.concat(responseBody).toString('utf-8'), 'hello h2!')
t.equal(Buffer.concat(requestBody).toString('utf-8'), 'hello from client!')
t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody)
})

0 comments on commit 7661b05

Please sign in to comment.