Description
Describe the bug
The ListWatch class stops reconnecting when a non-410 error occurs during a watch session, due to an early return in the doneHandler()
function.
Client Version
1.1.2
Server Version
v1.32.2-gke.1182003
To Reproduce
- Use
Listwatch
to watch a namespace that doesn't have any pods (to get a stream that doesn't really send anything) - Do not specify a
timeoutSeconds
in the request (as that seems to cause a clean disconnect) - Observe that the "connect" event is emitted.
- After 5 minutes or upon a connection error, the watch ends with a set of errors:
- AbortError: The user aborted a request.
- Error: Premature close (code
ERR_STREAM_PREMATURE_CLOSE
)
- No further attempts to reconnect are made (no "connect" events are emitted)
Note: I can easily reproduce this issue in our GKE cluster, but I can understand that the timeout here may be a bit tricky to reproduce in other environments.
See my script below for a mock server that simulates the issue on a local machine.
Expected behavior
I would expect the watch to attempt to reconnect on a connection error, not just 410 errors.
Example Code
I created a mock server to reproduce the issue. The server will:
- respond with an initial list of pods to non-watch requests
- cleanly close the connection for the first watch request
- prematurely close the connection for the second watch request
- cleanly close the connection for the third and subsequent watch requests
(although we'll not reach that point in this example)
Observe that:
- When the request is cleanly closed, the watch reconnects and the "connect" event is emitted.
- When the request is closed prematurely, the watch does not reconnect and the "connect" event is not emitted, instead two errors are reported:
AbortError: The user aborted a request.
Error: Premature close
import http from 'http'
import { CoreV1Api, KubeConfig, ListWatch, Watch } from '@kubernetes/client-node'
const MOCK_PORT = 8333
const MOCK_URL = `http://localhost:${MOCK_PORT}`
let requestCount = 0
const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked',
})
res.flushHeaders()
if (req.url?.includes('watch=true')) {
if (requestCount++ == 1) {
console.log('[mock-server] watch: Closing connection prematurely')
res.destroy()
} else {
console.log('[mock-server] watch: Closing connection cleanly.')
res.end()
}
} else {
console.log('[mock-server] Responding with initial list')
res.end(
JSON.stringify({
kind: 'PodList',
apiVersion: 'v1',
metadata: {},
items: [],
})
)
}
})
async function setupWatcher() {
const kubeConfig = new KubeConfig()
kubeConfig.loadFromOptions({
clusters: [{ name: 'mock-cluster', server: MOCK_URL, skipTLSVerify: true }],
users: [{ name: 'mock-user' }],
contexts: [{ name: 'mock-context', user: 'mock-user', cluster: 'mock-cluster' }],
currentContext: 'mock-context',
})
const api = kubeConfig.makeApiClient(CoreV1Api)
const watch = new Watch(kubeConfig)
const watcher = new ListWatch(
`/api/v1/namespaces/default/pods`,
watch,
() => api.listNamespacedPod({ namespace: 'default' }),
false
)
watcher.on('connect', () => console.log('[watcher] Connecting...'))
watcher.on('error', (err) => console.error('[watcher] Error', err))
watcher.start()
}
server.listen(MOCK_PORT, () => {
console.log(`[mock-server] Listening on port ${MOCK_PORT}`)
setupWatcher().catch(console.error)
})
Output:
[mock-server] Listening on port 8333
[watcher] Connecting...
[mock-server] Responding with initial list
[mock-server] watch: Closing connection cleanly.
[watcher] Connecting...
[mock-server] Responding with initial list
[mock-server] watch: Closing connection prematurely
[watcher] Error AbortError: The user aborted a request.
at abort (/home/bas/projects/watch-repro/node_modules/node-fetch/lib/index.js:1458:16)
at AbortSignal.abortAndFinalize (/home/bas/projects/watch-repro/node_modules/node-fetch/lib/index.js:1473:4)
at [nodejs.internal.kHybridDispatch] (node:internal/event_target:827:20)
at AbortSignal.dispatchEvent (node:internal/event_target:762:26)
at runAbort (node:internal/abort_controller:449:10)
at abortSignal (node:internal/abort_controller:435:3)
at AbortController.abort (node:internal/abort_controller:468:5)
at PassThrough.doneCallOnce (file:///home/bas/projects/watch-repro/node_modules/@kubernetes/client-node/dist/watch.js:33:28)
at PassThrough.emit (node:events:519:35)
at emitErrorNT (node:internal/streams/destroy:170:8) {
type: 'aborted'
}
[watcher] Error Error: Premature close
at IncomingMessage.<anonymous> (/home/bas/projects/watch-repro/node_modules/node-fetch/lib/index.js:1748:18)
at Object.onceWrapper (node:events:621:28)
at IncomingMessage.emit (node:events:507:28)
at emitCloseNT (node:internal/streams/destroy:148:10)
at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
Alternatively, here's the original code I used to reproduce the issue, which uses a real GKE cluster:
import { CoreV1Api, KubeConfig, ListWatch, Watch } from "@kubernetes/client-node"
async function main() {
const kubeConfig = new KubeConfig()
kubeConfig.loadFromDefault()
const api = kubeConfig.makeApiClient(CoreV1Api)
const path = `/api/v1/namespaces/default/pods`
const watch = new Watch(kubeConfig)
const watcher = new ListWatch(
path,
watch,
() => api.listNamespacedPod({ namespace: 'default' }),
false,
undefined
)
watcher.on('connect', () => console.log('connect'))
watcher.on('change', () => console.log('change'))
watcher.on('error', (err) => console.log('error:', err))
watcher.start()
setInterval(() => console.log('tick (30s)'), 30_000)
}
main().catch(console.error)
Environment (please complete the following information):
- OS: Linux
- Node.js version 23.11.0
- Cloud runtime GCP
Additional context
The likely root cause is the doneHandler()
function in the ListWatch
class:
Lines 143 to 147 in 073175f
On line 147, the function currently returns early if the error is not a 410 error, which prevents the watch from reconnecting.
Simply patching out this return statement kind of works, but it seems to cause two subsequent connection attempts (probably one for each error?). Since I’m not familiar enough with the internals of this project, I don't feel confident enough to open a PR at this time.