Skip to content

Commit

Permalink
Merge pull request #2037 from brendandburns/v5
Browse files Browse the repository at this point in the history
Add support for the v5 protocol
  • Loading branch information
k8s-ci-robot authored Nov 21, 2024
2 parents ce68be8 + d17707d commit 8db906a
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 7 deletions.
61 changes: 57 additions & 4 deletions src/web-socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import stream = require('stream');
import { V1Status } from './api';
import { KubeConfig } from './config';

const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];
const protocols = [
'v5.channel.k8s.io',
'v4.channel.k8s.io',
'v3.channel.k8s.io',
'v2.channel.k8s.io',
'channel.k8s.io',
];

export type TextHandler = (text: string) => boolean;
export type BinaryHandler = (stream: number, buff: Buffer) => boolean;
Expand All @@ -17,12 +23,37 @@ export interface WebSocketInterface {
): Promise<WebSocket.WebSocket>;
}

export interface StreamInterface {
stdin: stream.Readable;
stdout: stream.Writable;
stderr: stream.Writable;
}

export class WebSocketHandler implements WebSocketInterface {
public static readonly StdinStream: number = 0;
public static readonly StdoutStream: number = 1;
public static readonly StderrStream: number = 2;
public static readonly StatusStream: number = 3;
public static readonly ResizeStream: number = 4;
public static readonly CloseStream: number = 255;

public static supportsClose(protocol: string): boolean {
return protocol === 'v5.channel.k8s.io';
}

public static closeStream(streamNum: number, streams: StreamInterface): void {
switch (streamNum) {
case WebSocketHandler.StdinStream:
streams.stdin.pause();
break;
case WebSocketHandler.StdoutStream:
streams.stdout.end();
break;
case WebSocketHandler.StderrStream:
streams.stderr.end();
break;
}
}

public static handleStandardStreams(
streamNum: number,
Expand All @@ -39,6 +70,7 @@ export class WebSocketHandler implements WebSocketInterface {
stderr.write(buff);
} else if (streamNum === WebSocketHandler.StatusStream) {
// stream closing.
// Hacky, change tests to use the stream interface
if (stdout && stdout !== process.stdout) {
stdout.end();
}
Expand Down Expand Up @@ -69,6 +101,12 @@ export class WebSocketHandler implements WebSocketInterface {
});

stdin.on('end', () => {
if (WebSocketHandler.supportsClose(ws.protocol)) {
const buff = Buffer.alloc(2);
buff.writeUint8(this.CloseStream, 0);
buff.writeUint8(this.StdinStream, 1);
ws.send(buff);
}
ws.close();
});
// Keep the stream open
Expand Down Expand Up @@ -141,7 +179,16 @@ export class WebSocketHandler implements WebSocketInterface {
// factory is really just for test injection
public constructor(
readonly config: KubeConfig,
readonly socketFactory?: (uri: string, opts: WebSocket.ClientOptions) => WebSocket.WebSocket,
readonly socketFactory?: (
uri: string,
protocols: string[],
opts: WebSocket.ClientOptions,
) => WebSocket.WebSocket,
readonly streams: StreamInterface = {
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
},
) {}

/**
Expand Down Expand Up @@ -173,7 +220,7 @@ export class WebSocketHandler implements WebSocketInterface {

return await new Promise<WebSocket.WebSocket>((resolve, reject) => {
const client = this.socketFactory
? this.socketFactory(uri, opts)
? this.socketFactory(uri, protocols, opts)
: new WebSocket(uri, protocols, opts);
let resolved = false;

Expand All @@ -191,11 +238,17 @@ export class WebSocketHandler implements WebSocketInterface {
client.onmessage = ({ data }: { data: WebSocket.Data }) => {
// TODO: support ArrayBuffer and Buffer[] data types?
if (typeof data === 'string') {
if (data.charCodeAt(0) === WebSocketHandler.CloseStream) {
WebSocketHandler.closeStream(data.charCodeAt(1), this.streams);
}
if (textHandler && !textHandler(data)) {
client.close();
}
} else if (data instanceof Buffer) {
const streamNum = data.readInt8(0);
const streamNum = data.readUint8(0);
if (streamNum === WebSocketHandler.CloseStream) {
WebSocketHandler.closeStream(data.readInt8(1), this.streams);
}
if (binaryHandler && !binaryHandler(streamNum, data.subarray(1))) {
client.close();
}
Expand Down
108 changes: 105 additions & 3 deletions src/web-socket-handler_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { promisify } from 'util';
import { expect } from 'chai';
import WebSocket = require('isomorphic-ws');
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
import stream from 'node:stream';

import { V1Status } from './api';
import { KubeConfig } from './config';
Expand Down Expand Up @@ -119,7 +120,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -170,7 +171,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -239,7 +240,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -303,6 +304,107 @@ describe('WebSocket', () => {
});
});

describe('V5 protocol support', () => {
it('should handle close', async () => {
const kc = new KubeConfig();
const host = 'foo.company.com';
const server = `https://${host}`;
kc.clusters = [
{
name: 'cluster',
server,
} as Cluster,
] as Cluster[];
kc.contexts = [
{
cluster: 'cluster',
user: 'user',
} as Context,
] as Context[];
kc.users = [
{
name: 'user',
} as User,
];

const mockWs = {
protocol: 'v5.channel.k8s.io',
} as WebSocket.WebSocket;
let uriOut = '';
let endCalled = false;
const handler = new WebSocketHandler(
kc,
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
{
stdin: process.stdin,
stderr: process.stderr,
stdout: {
end: () => {
endCalled = true;
},
} as stream.Writable,
},
);
const path = '/some/path';

const promise = handler.connect(path, null, null);
await setImmediatePromise();

expect(uriOut).to.equal(`wss://${host}${path}`);

const event = {
target: mockWs,
type: 'open',
};
mockWs.onopen!(event);
const errEvt = {
error: {},
message: 'some message',
type: 'some type',
target: mockWs,
};
const closeBuff = Buffer.alloc(2);
closeBuff.writeUint8(255, 0);
closeBuff.writeUint8(WebSocketHandler.StdoutStream, 1);

mockWs.onmessage!({
data: closeBuff,
type: 'type',
target: mockWs,
});
await promise;
expect(endCalled).to.be.true;
});
it('should handle closing stdin < v4 protocol', () => {
const ws = {
// send is not defined, so this will throw if we try to send the close message.
close: () => {},
} as WebSocket;
const stdinStream = new ReadableStreamBuffer();
WebSocketHandler.handleStandardInput(ws, stdinStream);
stdinStream.emit('end');
});
it('should handle closing stdin v5 protocol', () => {
let sent: Buffer | null = null;
const ws = {
protocol: 'v5.channel.k8s.io',
send: (data) => {
sent = data;
},
close: () => {},
} as WebSocket;
const stdinStream = new ReadableStreamBuffer();
WebSocketHandler.handleStandardInput(ws, stdinStream);
stdinStream.emit('end');
expect(sent).to.not.be.null;
expect(sent!.readUint8(0)).to.equal(255); // CLOSE signal
expect(sent!.readUint8(1)).to.equal(0); // Stdin stream is #0
});
});

describe('Restartable Handle Standard Input', () => {
it('should throw on negative retry', () => {
const p = new Promise<WebSocket.WebSocket>(() => {});
Expand Down

0 comments on commit 8db906a

Please sign in to comment.