Skip to content

Commit

Permalink
Tighten up CF Socket state handling
Browse files Browse the repository at this point in the history
  • Loading branch information
petebacondarwin committed May 5, 2023
1 parent c24c3de commit 9a50aa1
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions packages/pg/lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,7 @@ class CFSocket extends EventEmitter {
const { connect } = await import('cloudflare:sockets')
this._cfSocket = connect(`${host}:${port}`, options)
this._cfWriter = this._cfSocket.writable.getWriter()

this._cfSocket.closed.then(() => {
if (!this._upgrading) {
log('CF socket closed')
this._cfSocket = null
this.emit('close')
} else {
this._upgrading = false
}
})
this._addClosedHandler()

this._cfReader = this._cfSocket.readable.getReader()
if (this.ssl) {
Expand Down Expand Up @@ -149,14 +140,35 @@ class CFSocket extends EventEmitter {
}

startTls(options) {
if (this._upgraded) {
// Don't try to upgrade again.
this.emit('error', 'Cannot call `startTls()` more than once on a socket')
return
}
this._cfWriter.releaseLock()
this._cfReader.releaseLock()
this._upgrading = true
this._cfSocket = this._cfSocket.startTls(options)
this._cfWriter = this._cfSocket.writable.getWriter()
this._cfReader = this._cfSocket.readable.getReader()
this._addClosedHandler()
this._listen().catch((e) => this.emit('error', e))
}

_addClosedHandler() {
this._cfSocket.closed
.then(() => {
if (!this._upgrading) {
log('CF socket closed')
this._cfSocket = null
this.emit('close')
} else {
this._upgrading = false
this._upgraded = true
}
})
.catch((e) => this.emit('error', e))
}
}

const debug = false
Expand Down

0 comments on commit 9a50aa1

Please sign in to comment.