Skip to content

Commit

Permalink
cache: Fix connection leak.
Browse files Browse the repository at this point in the history
Make sure that done callback of a watcher isn't called more than
once and call abort() on request object to close the connection
when done.
  • Loading branch information
Jan Kryl committed Jan 26, 2021
1 parent dc1260c commit 8d3e933
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 220 deletions.
27 changes: 11 additions & 16 deletions src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ADD, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer';
import { KubernetesObject } from './types';
import { Watch } from './watch';
import { RequestResult, Watch } from './watch';

export interface ObjectCache<T> {
get(name: string, namespace?: string): T | undefined;
Expand All @@ -12,7 +12,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
private stopped: boolean;
private request: RequestResult | undefined;

public constructor(
private readonly path: string,
Expand All @@ -25,19 +25,20 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[DELETE] = [];
this.callbackCache[ERROR] = [];
this.resourceVersion = '';
this.stopped = true;
if (autoStart) {
this.start();
this.doneHandler(null);
}
}

public async start(): Promise<void> {
this.stopped = false;
await this.doneHandler();
await this.doneHandler(null);
}

public stop(): void {
this.stopped = true;
if (this.request) {
this.request.abort();
this.request = undefined;
}
}

public on(verb: string, cb: ObjectCallback<T>): void {
Expand Down Expand Up @@ -79,15 +80,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
return this.resourceVersion;
}

private async errorHandler(err: any): Promise<void> {
private async doneHandler(err: any): Promise<any> {
this.stop();
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
}
this.stopped = true;
}

private async doneHandler(): Promise<any> {
if (this.stopped) {
return;
}
// TODO: Don't always list here for efficiency
Expand All @@ -106,12 +102,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
});
this.addOrUpdateItems(list.items);
await this.watch.watch(
this.request = await this.watch.watch(
this.path,
{ resourceVersion: list.metadata!.resourceVersion },
this.watchHandler.bind(this),
this.doneHandler.bind(this),
this.errorHandler.bind(this),
);
}

Expand Down
129 changes: 38 additions & 91 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ import chaiAsPromised = require('chai-as-promised');
import * as mock from 'ts-mockito';

import http = require('http');
import { Duplex } from 'stream';
import { EventEmitter } from 'ws';

import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
import { deleteObject, ListWatch, deleteItems } from './cache';
import { ListCallback, ADD, UPDATE, DELETE, ListPromise } from './informer';
import { ADD, UPDATE, DELETE, ListPromise } from './informer';

use(chaiAsPromised);

import { Watch } from './watch';
import { RequestResult, Watch } from './watch';

// Object replacing real Request object in the test
class FakeRequest extends EventEmitter implements RequestResult {
pipe(stream: Duplex): void {}
abort() {}
}

describe('ListWatchCache', () => {
it('should throw on unknown update', () => {
Expand All @@ -24,7 +32,12 @@ describe('ListWatchCache', () => {
(resolve, reject) => {
resolve({
response: {} as http.IncomingMessage,
body: {} as V1NamespaceList,
body: {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: [],
} as V1NamespaceList,
});
},
);
Expand Down Expand Up @@ -88,15 +101,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -159,7 +166,7 @@ describe('ListWatchCache', () => {
} as V1ObjectMeta,
} as V1Namespace);

await doneHandler();
await doneHandler(null);
expect(cache.list().length, 'all namespace list').to.equal(1);
expect(cache.list('default').length, 'default namespace list').to.equal(1);
expect(cache.list('other'), 'other namespace list').to.be.undefined;
Expand Down Expand Up @@ -198,15 +205,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -285,15 +286,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -362,15 +357,9 @@ describe('ListWatchCache', () => {
};
let promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
Expand All @@ -390,18 +379,12 @@ describe('ListWatchCache', () => {

promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
doneHandler();
doneHandler(null);
await promise;
expect(addObjects).to.deep.equal(list);
expect(updateObjects).to.deep.equal(list);
Expand Down Expand Up @@ -447,15 +430,9 @@ describe('ListWatchCache', () => {
};
let promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
Expand All @@ -476,19 +453,13 @@ describe('ListWatchCache', () => {

promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
listObj.items = list2;
doneHandler();
doneHandler(null);
await promise;
expect(addObjects).to.deep.equal(list);
expect(updateObjects).to.deep.equal(list2);
Expand Down Expand Up @@ -536,15 +507,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -662,13 +627,7 @@ describe('ListWatchCache', () => {
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -727,13 +686,7 @@ describe('ListWatchCache', () => {
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -820,13 +773,7 @@ describe('ListWatchCache', () => {
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down
Loading

0 comments on commit 8d3e933

Please sign in to comment.