diff --git a/packages/pg/lib/stream.js b/packages/pg/lib/stream.js index c626eeb6c..e3444b259 100644 --- a/packages/pg/lib/stream.js +++ b/packages/pg/lib/stream.js @@ -39,6 +39,7 @@ class CFSocket extends EventEmitter { this.destroyed = false this._upgrading = false + this._upgraded = false this._cfSocket = null this._cfWriter = null this._cfReader = null @@ -66,16 +67,7 @@ class CFSocket extends EventEmitter { const { connect } = await import('cloudflare:sockets') this._cfSocket = connect(`${host}:${port}`, options) this._cfWriter = this._cfSocket.writable.getWriter() - - this._cfSocket.closed.then(() => { - if (!this._upgrading) { - log('CF socket closed') - this._cfSocket = null - this.emit('close') - } else { - this._upgrading = false - } - }) + this._addClosedHandler() this._cfReader = this._cfSocket.readable.getReader() if (this.ssl) { @@ -149,14 +141,35 @@ class CFSocket extends EventEmitter { } startTls(options) { + if (this._upgraded) { + // Don't try to upgrade again. + this.emit('error', 'Cannot call `startTls()` more than once on a socket') + return + } this._cfWriter.releaseLock() this._cfReader.releaseLock() this._upgrading = true this._cfSocket = this._cfSocket.startTls(options) this._cfWriter = this._cfSocket.writable.getWriter() this._cfReader = this._cfSocket.readable.getReader() + this._addClosedHandler() this._listen().catch((e) => this.emit('error', e)) } + + _addClosedHandler() { + this._cfSocket.closed + .then(() => { + if (!this._upgrading) { + log('CF socket closed') + this._cfSocket = null + this.emit('close') + } else { + this._upgrading = false + this._upgraded = true + } + }) + .catch((e) => this.emit('error', e)) + } } const debug = false