Skip to content

Commit

Permalink
Tighten up CF Socket state handling
Browse files Browse the repository at this point in the history
  • Loading branch information
petebacondarwin committed May 5, 2023
1 parent c24c3de commit ddffc66
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions packages/pg/lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class CFSocket extends EventEmitter {
this.destroyed = false

this._upgrading = false
this._upgraded = false
this._cfSocket = null
this._cfWriter = null
this._cfReader = null
Expand Down Expand Up @@ -66,16 +67,7 @@ class CFSocket extends EventEmitter {
const { connect } = await import('cloudflare:sockets')
this._cfSocket = connect(`${host}:${port}`, options)
this._cfWriter = this._cfSocket.writable.getWriter()

this._cfSocket.closed.then(() => {
if (!this._upgrading) {
log('CF socket closed')
this._cfSocket = null
this.emit('close')
} else {
this._upgrading = false
}
})
this._addClosedHandler()

this._cfReader = this._cfSocket.readable.getReader()
if (this.ssl) {
Expand Down Expand Up @@ -149,14 +141,35 @@ class CFSocket extends EventEmitter {
}

startTls(options) {
if (this._upgraded) {
// Don't try to upgrade again.
this.emit('error', 'Cannot call `startTls()` more than once on a socket')
return
}
this._cfWriter.releaseLock()
this._cfReader.releaseLock()
this._upgrading = true
this._cfSocket = this._cfSocket.startTls(options)
this._cfWriter = this._cfSocket.writable.getWriter()
this._cfReader = this._cfSocket.readable.getReader()
this._addClosedHandler()
this._listen().catch((e) => this.emit('error', e))
}

_addClosedHandler() {
this._cfSocket.closed
.then(() => {
if (!this._upgrading) {
log('CF socket closed')
this._cfSocket = null
this.emit('close')
} else {
this._upgrading = false
this._upgraded = true
}
})
.catch((e) => this.emit('error', e))
}
}

const debug = false
Expand Down

0 comments on commit ddffc66

Please sign in to comment.