Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Frontend] Node server artifact handler supports preview and handles both archived and unarchived (gzip/tarball) artifacts #2172 #2992

Merged
merged 1 commit into from
Mar 24, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 70 additions & 10 deletions frontend/server/app.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC
// Copyright 2019-2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -186,10 +186,13 @@ describe('UIServer apis', () => {
it('responds with a minio artifact if source=minio', done => {
const artifactContent = 'hello world';
const mockedMinioClient: jest.Mock = MinioClient as any;
const mockedGetTarObjectAsString: jest.Mock = minioHelper.getTarObjectAsString as any;
mockedGetTarObjectAsString.mockImplementationOnce(opt =>
const mockedGetObjectStream: jest.Mock = minioHelper.getObjectStream as any;
const objStream = new PassThrough();
objStream.end(artifactContent);

mockedGetObjectStream.mockImplementationOnce(opt =>
opt.bucket === 'ml-pipeline' && opt.key === 'hello/world.txt'
? Promise.resolve(artifactContent)
? Promise.resolve(objStream)
: Promise.reject('Unable to retrieve minio artifact.'),
);
const configs = loadConfigs(argv, {
@@ -249,7 +252,7 @@ describe('UIServer apis', () => {
});
});

it('responds with a s3 artifact if source=s3', done => {
it('responds with partial s3 artifact if peek=5 flag is set', done => {
const artifactContent = 'hello world';
const mockedMinioClient: jest.Mock = minioHelper.createMinioClient as any;
const mockedGetObjectStream: jest.Mock = minioHelper.getObjectStream as any;
@@ -270,8 +273,8 @@ describe('UIServer apis', () => {

const request = requests(app.start());
request
.get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt')
.expect(200, artifactContent, err => {
.get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), err => {
expect(mockedMinioClient).toBeCalledWith({
accessKey: 'aws123',
endPoint: 's3.amazonaws.com',
@@ -285,7 +288,10 @@ describe('UIServer apis', () => {
const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'http://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) })
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
@@ -304,12 +310,42 @@ describe('UIServer apis', () => {
});
});

it('responds with partial http artifact if peek=5 flag is set', done => {
const artifactContent = 'hello world';
const mockedFetch: jest.Mock = fetch as any;
mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'http://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
HTTP_BASE_URL: 'foo.bar/',
});
app = new UIServer(configs);

const request = requests(app.start());
request
.get('/artifacts/get?source=http&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), err => {
expect(mockedFetch).toBeCalledWith('http://foo.bar/ml-pipeline/hello/world.txt', {
headers: {},
});
done(err);
});
});

it('responds with a https artifact if source=https', done => {
const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'https://foo.bar/ml-pipeline/hello/world.txt' &&
opts.headers.Authorization === 'someToken'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) })
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
@@ -336,7 +372,10 @@ describe('UIServer apis', () => {
const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, _opts: any) =>
url === 'https://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) })
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
@@ -379,6 +418,26 @@ describe('UIServer apis', () => {
.get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt')
.expect(200, artifactContent + '\n', done);
});

it('responds with a partial gcs artifact if peek=5 is set', done => {
const artifactContent = 'hello world';
const mockedGcsStorage: jest.Mock = GCSStorage as any;
const stream = new PassThrough();
stream.end(artifactContent);
mockedGcsStorage.mockImplementationOnce(() => ({
bucket: () => ({
getFiles: () =>
Promise.resolve([[{ name: 'hello/world.txt', createReadStream: () => stream }]]),
}),
}));
const configs = loadConfigs(argv, {});
app = new UIServer(configs);

const request = requests(app.start());
request
.get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), done);
});
});

describe('/system', () => {
@@ -418,6 +477,7 @@ describe('UIServer apis', () => {
.expect(500, 'GKE metadata endpoints are disabled.', done);
});
});

describe('/project-id', () => {
it('responds with project id data from gke metadata', done => {
mockedFetch.mockImplementationOnce((url: string, _opts: any) =>
104 changes: 62 additions & 42 deletions frontend/server/handlers/artifacts.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC
// Copyright 2019-2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -11,14 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Handler, Request, Response } from 'express';
import fetch from 'node-fetch';
import { AWSConfigs, HttpConfigs, MinioConfigs } from '../configs';
import { Client as MinioClient } from 'minio';
import { PreviewStream } from '../utils';
import { createMinioClient, getObjectStream } from '../minio-helper';
import { Handler, Request, Response } from 'express';
import { Storage } from '@google-cloud/storage';

import { getTarObjectAsString, getObjectStream, createMinioClient } from '../minio-helper';
import { HttpConfigs, AWSConfigs, MinioConfigs } from '../configs';

/**
* ArtifactsQueryStrings describes the expected query strings key value pairs
* in the artifact request object.
@@ -30,6 +30,8 @@ interface ArtifactsQueryStrings {
bucket: string;
/** artifact key/path that is uri encoded. */
key: string;
/** return only the first x characters or bytes. */
peek?: number;
}

/**
@@ -44,7 +46,9 @@ export function getArtifactsHandler(artifactsConfigs: {
}): Handler {
const { aws, http, minio } = artifactsConfigs;
return async (req, res) => {
const { source, bucket, key: encodedKey } = req.query as Partial<ArtifactsQueryStrings>;
const { source, bucket, key: encodedKey, peek = 0 } = req.query as Partial<
ArtifactsQueryStrings
>;
if (!source) {
res.status(500).send('Storage source is missing from artifact request');
return;
@@ -61,31 +65,38 @@ export function getArtifactsHandler(artifactsConfigs: {
console.log(`Getting storage artifact at: ${source}: ${bucket}/${key}`);
switch (source) {
case 'gcs':
getGCSArtifactHandler({ bucket, key })(req, res);
getGCSArtifactHandler({ bucket, key }, peek)(req, res);
break;

case 'minio':
getMinioArtifactHandler({
bucket,
client: new MinioClient(minio),
key,
})(req, res);
getMinioArtifactHandler(
{
bucket,
client: new MinioClient(minio),
key,
},
peek,
)(req, res);
break;

case 's3':
getS3ArtifactHandler({
bucket,
client: await createMinioClient(aws),
key,
})(req, res);
getMinioArtifactHandler(
{
bucket,
client: await createMinioClient(aws),
key,
},
peek,
)(req, res);
break;

case 'http':
case 'https':
getHttpArtifactsHandler(getHttpUrl(source, http.baseUrl || '', bucket, key), http.auth)(
req,
res,
);
getHttpArtifactsHandler(
getHttpUrl(source, http.baseUrl || '', bucket, key),
http.auth,
peek,
)(req, res);
break;

default:
@@ -114,6 +125,7 @@ function getHttpArtifactsHandler(
key: string;
defaultValue: string;
} = { key: '', defaultValue: '' },
peek: number = 0,
) {
return async (req: Request, res: Response) => {
const headers = {};
@@ -125,32 +137,30 @@ function getHttpArtifactsHandler(
req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue;
}
const response = await fetch(url, { headers });
const content = await response.buffer();
res.send(content);
response.body
.on('error', err => res.status(500).send(`Unable to retrieve artifact at ${url}: ${err}`))
.pipe(new PreviewStream({ peek }))
.pipe(res);
};
}

function getS3ArtifactHandler(options: { bucket: string; key: string; client: MinioClient }) {
function getMinioArtifactHandler(
options: { bucket: string; key: string; client: MinioClient },
peek: number = 0,
) {
return async (_: Request, res: Response) => {
try {
const stream = await getObjectStream(options);
stream.on('end', () => res.end());
stream.on('error', err =>
res
.status(500)
.send(`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`),
);
stream.pipe(res);
} catch (err) {
res.send(`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`);
}
};
}

function getMinioArtifactHandler(options: { bucket: string; key: string; client: MinioClient }) {
return async (_: Request, res: Response) => {
try {
res.send(await getTarObjectAsString(options));
stream
.on('error', err =>
res
.status(500)
.send(
`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`,
),
)
.pipe(new PreviewStream({ peek }))
.pipe(res);
} catch (err) {
res
.status(500)
@@ -159,7 +169,7 @@ function getMinioArtifactHandler(options: { bucket: string; key: string; client:
};
}

function getGCSArtifactHandler(options: { key: string; bucket: string }) {
function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: number = 0) {
const { key, bucket } = options;
return async (_: Request, res: Response) => {
try {
@@ -197,6 +207,16 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }) {
matchingFiles.map(file => file.name).join(','),
);
let contents = '';
// TODO: support peek for concatenated matching files
if (peek) {
matchingFiles[0]
.createReadStream()
.pipe(new PreviewStream({ peek }))
.pipe(res);
return;
}

// if not peeking, iterate and append all the files
matchingFiles.forEach((f, i) => {
const buffer: Buffer[] = [];
f.createReadStream()
Loading