diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index fd73cbe6cb2dea..170f338fb594d8 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -2601,59 +2601,45 @@ class QuicStream extends Duplex { this._readableState.readingMore = true; this.on('pause', streamOnPause); - // See src/node_quic_stream.h for an explanation - // of the initial states for unidirectional streams. - if (this.unidirectional) { - if (session instanceof QuicServerSession) { - if (this.serverInitiated) { - // Close the readable side - this.push(null); - this.read(); - } else { - // Close the writable side - this.end(); - } - } else if (this.serverInitiated) { - // Close the writable side - this.end(); - } else { - this.push(null); - this.read(); - } - } - // The QuicStream writes are corked until kSetHandle // is set, ensuring that writes are buffered in JavaScript // until we have somewhere to send them. this.cork(); } - // Set handle is called once the QuicSession has been able - // to complete creation of the internal QuicStream handle. - // This will happen only after the QuicSession's own - // internal handle has been created. The QuicStream object - // is still minimally usable before this but any data - // written will be buffered until kSetHandle is called. - [kSetHandle](handle) { - this[kHandle] = handle; - if (handle !== undefined) { - handle.onread = onStreamRead; - handle[owner_symbol] = this; - this[async_id_symbol] = handle.getAsyncId(); - this.#id = handle.id(); - this.#dataRateHistogram = new Histogram(handle.rate); - this.#dataSizeHistogram = new Histogram(handle.size); - this.#dataAckHistogram = new Histogram(handle.ack); - this.uncork(); - this.emit('ready'); - } else { - if (this.#dataRateHistogram) - this.#dataRateHistogram[kDestroyHistogram](); - if (this.#dataSizeHistogram) - this.#dataSizeHistogram[kDestroyHistogram](); - if (this.#dataAckHistogram) - this.#dataAckHistogram[kDestroyHistogram](); - } + _construct(cb) { + // Set handle is called once the QuicSession has been able + // to complete creation of the internal QuicStream handle. + // This will happen only after the QuicSession's own + // internal handle has been created. The QuicStream object + // is still minimally usable before this but any data + // written will be buffered until kSetHandle is called. + this[kSetHandle] = (handle) => { + this[kHandle] = handle; + if (handle !== undefined) { + handle.onread = onStreamRead; + handle[owner_symbol] = this; + this[async_id_symbol] = handle.getAsyncId(); + this.#id = handle.id(); + this.#dataRateHistogram = new Histogram(handle.rate); + this.#dataSizeHistogram = new Histogram(handle.size); + this.#dataAckHistogram = new Histogram(handle.ack); + this.uncork(); + + this.emit('ready'); + // TODO (fix): What happens on failure? We still need to + // invoke the callback somehow? Change signature of kSetHandle + // to (err, handle)? + cb(); + } else { + if (this.#dataRateHistogram) + this.#dataRateHistogram[kDestroyHistogram](); + if (this.#dataSizeHistogram) + this.#dataSizeHistogram[kDestroyHistogram](); + if (this.#dataAckHistogram) + this.#dataAckHistogram[kDestroyHistogram](); + } + }; } [kStreamReset](code) { @@ -2700,6 +2686,7 @@ class QuicStream extends Duplex { if (this.destroyed || this.#closed) return; + // TODO: This will deadlock on failure? if (this.pending) return this.once('ready', () => this[kClose](family, code)); @@ -2785,19 +2772,6 @@ class QuicStream extends Duplex { } #writeGeneric = function(writev, data, encoding, cb) { - if (this.destroyed) - return; // TODO(addaleax): Can this happen? - - // The stream should be corked while still pending - // but just in case uncork - // was called early, defer the actual write until the - // ready event is emitted. - if (this.pending) { - return this.once('ready', () => { - this.#writeGeneric(writev, data, encoding, cb); - }); - } - this[kUpdateTimer](); const req = (writev) ? writevGeneric(this, data, cb) : @@ -2821,13 +2795,6 @@ class QuicStream extends Duplex { // coming so that a fin stream packet can be // sent. _final(cb) { - // The QuicStream should be corked while pending - // so this shouldn't be called, but just in case - // the stream was prematurely uncorked, defer the - // operation until the ready event is emitted. - if (this.pending) - return this.once('ready', () => this._final(cb)); - const handle = this[kHandle]; if (handle === undefined) { cb(); @@ -2840,16 +2807,10 @@ class QuicStream extends Duplex { const err = handle.shutdown(req); if (err === 1) return cb(); + // TODO (fix): else if (err !== 0)? } _read(nread) { - if (this.pending) - return this.once('ready', () => this._read(nread)); - - if (this.destroyed) { // TODO(addaleax): Can this happen? - this.push(null); - return; - } if (!this.#didRead) { this._readableState.readingMore = false; this.#didRead = true; @@ -2895,6 +2856,7 @@ class QuicStream extends Duplex { throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd); if (this.pending) { + // TODO: This will deadlock on failure? return this.once('ready', () => { this.sendFD(fd, { offset, length }, ownsFd); });