Skip to content

Commit

Permalink
kfp UI node server support preview and handles gzip, tarball, and raw…
Browse files Browse the repository at this point in the history
… artifacts in a consistent manner.
  • Loading branch information
eterna2 committed Mar 23, 2020
1 parent 4286953 commit c58a032
Show file tree
Hide file tree
Showing 9 changed files with 956 additions and 542 deletions.
80 changes: 70 additions & 10 deletions frontend/server/app.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC
// Copyright 2019-2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -176,10 +176,13 @@ describe('UIServer apis', () => {
it('responds with a minio artifact if source=minio', done => {
const artifactContent = 'hello world';
const mockedMinioClient: jest.Mock = MinioClient as any;
const mockedGetTarObjectAsString: jest.Mock = minioHelper.getTarObjectAsString as any;
mockedGetTarObjectAsString.mockImplementationOnce(opt =>
const mockedGetObjectStream: jest.Mock = minioHelper.getObjectStream as any;
const objStream = new PassThrough();
objStream.end(artifactContent);

mockedGetObjectStream.mockImplementationOnce(opt =>
opt.bucket === 'ml-pipeline' && opt.key === 'hello/world.txt'
? Promise.resolve(artifactContent)
? Promise.resolve(objStream)
: Promise.reject('Unable to retrieve minio artifact.'),
);
const configs = loadConfigs(argv, {
Expand Down Expand Up @@ -239,7 +242,7 @@ describe('UIServer apis', () => {
});
});

it('responds with a s3 artifact if source=s3', done => {
it('responds with partial s3 artifact if peek=5 flag is set', done => {
const artifactContent = 'hello world';
const mockedMinioClient: jest.Mock = minioHelper.createMinioClient as any;
const mockedGetObjectStream: jest.Mock = minioHelper.getObjectStream as any;
Expand All @@ -260,8 +263,8 @@ describe('UIServer apis', () => {

const request = requests(app.start());
request
.get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt')
.expect(200, artifactContent, err => {
.get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), err => {
expect(mockedMinioClient).toBeCalledWith({
accessKey: 'aws123',
endPoint: 's3.amazonaws.com',
Expand All @@ -275,7 +278,10 @@ describe('UIServer apis', () => {
const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'http://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) })
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
Expand All @@ -294,12 +300,42 @@ describe('UIServer apis', () => {
});
});

it('responds with partial http artifact if peek=5 flag is set', done => {
const artifactContent = 'hello world';
const mockedFetch: jest.Mock = fetch as any;
mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'http://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
HTTP_BASE_URL: 'foo.bar/',
});
app = new UIServer(configs);

const request = requests(app.start());
request
.get('/artifacts/get?source=http&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), err => {
expect(mockedFetch).toBeCalledWith('http://foo.bar/ml-pipeline/hello/world.txt', {
headers: {},
});
done(err);
});
});

it('responds with a https artifact if source=https', done => {
const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'https://foo.bar/ml-pipeline/hello/world.txt' &&
opts.headers.Authorization === 'someToken'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) })
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
Expand All @@ -326,7 +362,10 @@ describe('UIServer apis', () => {
const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, _opts: any) =>
url === 'https://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) })
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
Expand Down Expand Up @@ -369,6 +408,26 @@ describe('UIServer apis', () => {
.get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt')
.expect(200, artifactContent + '\n', done);
});

it('responds with a partial gcs artifact if peek=5 is set', done => {
const artifactContent = 'hello world';
const mockedGcsStorage: jest.Mock = GCSStorage as any;
const stream = new PassThrough();
stream.end(artifactContent);
mockedGcsStorage.mockImplementationOnce(() => ({
bucket: () => ({
getFiles: () =>
Promise.resolve([[{ name: 'hello/world.txt', createReadStream: () => stream }]]),
}),
}));
const configs = loadConfigs(argv, {});
app = new UIServer(configs);

const request = requests(app.start());
request
.get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), done);
});
});

describe('/system', () => {
Expand Down Expand Up @@ -411,6 +470,7 @@ describe('UIServer apis', () => {
.expect(500, 'GKE metadata endpoints are disabled.', done);
});
});

describe('/project-id', () => {
beforeEach(() => {
mockedK8sHelper.isInCluster = true;
Expand Down
104 changes: 62 additions & 42 deletions frontend/server/handlers/artifacts.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC
// Copyright 2019-2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,14 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Handler, Request, Response } from 'express';
import fetch from 'node-fetch';
import { AWSConfigs, HttpConfigs, MinioConfigs } from '../configs';
import { Client as MinioClient } from 'minio';
import { PreviewStream } from '../utils';
import { createMinioClient, getObjectStream } from '../minio-helper';
import { Handler, Request, Response } from 'express';
import { Storage } from '@google-cloud/storage';

import { getTarObjectAsString, getObjectStream, createMinioClient } from '../minio-helper';
import { HttpConfigs, AWSConfigs, MinioConfigs } from '../configs';

/**
* ArtifactsQueryStrings describes the expected query strings key value pairs
* in the artifact request object.
Expand All @@ -30,6 +30,8 @@ interface ArtifactsQueryStrings {
bucket: string;
/** artifact key/path that is uri encoded. */
key: string;
/** return only the first x characters or bytes. */
peek?: number;
}

/**
Expand All @@ -44,7 +46,9 @@ export function getArtifactsHandler(artifactsConfigs: {
}): Handler {
const { aws, http, minio } = artifactsConfigs;
return async (req, res) => {
const { source, bucket, key: encodedKey } = req.query as Partial<ArtifactsQueryStrings>;
const { source, bucket, key: encodedKey, peek = 0 } = req.query as Partial<
ArtifactsQueryStrings
>;
if (!source) {
res.status(500).send('Storage source is missing from artifact request');
return;
Expand All @@ -61,31 +65,38 @@ export function getArtifactsHandler(artifactsConfigs: {
console.log(`Getting storage artifact at: ${source}: ${bucket}/${key}`);
switch (source) {
case 'gcs':
getGCSArtifactHandler({ bucket, key })(req, res);
getGCSArtifactHandler({ bucket, key }, peek)(req, res);
break;

case 'minio':
getMinioArtifactHandler({
bucket,
client: new MinioClient(minio),
key,
})(req, res);
getMinioArtifactHandler(
{
bucket,
client: new MinioClient(minio),
key,
},
peek,
)(req, res);
break;

case 's3':
getS3ArtifactHandler({
bucket,
client: await createMinioClient(aws),
key,
})(req, res);
getMinioArtifactHandler(
{
bucket,
client: await createMinioClient(aws),
key,
},
peek,
)(req, res);
break;

case 'http':
case 'https':
getHttpArtifactsHandler(getHttpUrl(source, http.baseUrl || '', bucket, key), http.auth)(
req,
res,
);
getHttpArtifactsHandler(
getHttpUrl(source, http.baseUrl || '', bucket, key),
http.auth,
peek,
)(req, res);
break;

default:
Expand Down Expand Up @@ -114,6 +125,7 @@ function getHttpArtifactsHandler(
key: string;
defaultValue: string;
} = { key: '', defaultValue: '' },
peek: number = 0,
) {
return async (req: Request, res: Response) => {
const headers = {};
Expand All @@ -125,32 +137,30 @@ function getHttpArtifactsHandler(
req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue;
}
const response = await fetch(url, { headers });
const content = await response.buffer();
res.send(content);
response.body
.on('error', err => res.status(500).send(`Unable to retrieve artifact at ${url}: ${err}`))
.pipe(new PreviewStream({ peek }))
.pipe(res);
};
}

function getS3ArtifactHandler(options: { bucket: string; key: string; client: MinioClient }) {
function getMinioArtifactHandler(
options: { bucket: string; key: string; client: MinioClient },
peek: number = 0,
) {
return async (_: Request, res: Response) => {
try {
const stream = await getObjectStream(options);
stream.on('end', () => res.end());
stream.on('error', err =>
res
.status(500)
.send(`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`),
);
stream.pipe(res);
} catch (err) {
res.send(`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`);
}
};
}

function getMinioArtifactHandler(options: { bucket: string; key: string; client: MinioClient }) {
return async (_: Request, res: Response) => {
try {
res.send(await getTarObjectAsString(options));
stream
.on('error', err =>
res
.status(500)
.send(
`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`,
),
)
.pipe(new PreviewStream({ peek }))
.pipe(res);
} catch (err) {
res
.status(500)
Expand All @@ -159,7 +169,7 @@ function getMinioArtifactHandler(options: { bucket: string; key: string; client:
};
}

function getGCSArtifactHandler(options: { key: string; bucket: string }) {
function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: number = 0) {
const { key, bucket } = options;
return async (_: Request, res: Response) => {
try {
Expand Down Expand Up @@ -197,6 +207,16 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }) {
matchingFiles.map(file => file.name).join(','),
);
let contents = '';
if (peek) {
// TODO: support peek for concatenated matching files
matchingFiles[0]
.createReadStream()
.pipe(new PreviewStream({ peek }))
.pipe(res);
return;
}

// if not peeking, iterate and append all the files
matchingFiles.forEach((f, i) => {
const buffer: Buffer[] = [];
f.createReadStream()
Expand Down
Loading

0 comments on commit c58a032

Please sign in to comment.