Skip to content

Commit

Permalink
Merge branch 'channel_version_s3_upload' into cluster_subscription_gql
Browse files Browse the repository at this point in the history
  • Loading branch information
dalehille committed Jun 3, 2020
2 parents 9266b50 + 992ab51 commit 4a3ee04
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 60 deletions.
124 changes: 73 additions & 51 deletions app/apollo/resolvers/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
const _ = require('lodash');
const { v4: UUID } = require('uuid');
const crypto = require('crypto');
const conf = require('../../conf.js').conf;
const S3ClientClass = require('../../s3/s3Client');
const { UserInputError, ValidationError } = require('apollo-server');
const { WritableStreamBuffer } = require('stream-buffers');
const stream = require('stream');

const { ACTIONS, TYPES } = require('../models/const');
const { whoIs, validAuth, NotFoundError} = require ('./common');
Expand Down Expand Up @@ -74,17 +78,29 @@ const channelResolvers = {
throw NotFoundError(`versionObj "${version_uuid}" is not found for ${channel.name}:${channel.uuid}`);
}

if (versionObj.location === 'mongo') {
const deployableVersionObj = await models.DeployableVersion.findOne({org_id, channel_id: channel_uuid, uuid: version_uuid });
if (!deployableVersionObj) {
throw new NotFoundError(`DeployableVersion is not found for ${channel.name}:${channel.uuid}/${versionObj.name}:${versionObj.uuid}.`);
}
const deployableVersionObj = await models.DeployableVersion.findOne({org_id, channel_id: channel_uuid, uuid: version_uuid });
if (!deployableVersionObj) {
throw `DeployableVersion is not found for ${channel.name}:${channel.uuid}/${versionObj.name}:${versionObj.uuid}.`;
}

if (versionObj.location === 'mongo') {
deployableVersionObj.content = await decryptOrgData(orgKey, deployableVersionObj.content);
return deployableVersionObj;
} else {
//TODO: implement for S3
throw 'fix me, not implement for S3 yet';
}
else if(versionObj.location === 's3'){
const url = deployableVersionObj.content;
const urlObj = new URL(url);
const fullPath = urlObj.pathname;
var parts = _.filter(_.split(fullPath, '/'));
var bucketName = parts.shift();
var path = `${parts.join('/')}`;

const s3Client = new S3ClientClass(conf);
deployableVersionObj.content = await s3Client.getAndDecryptFile(bucketName, path, orgKey, deployableVersionObj.iv);
}
else {
throw `versionObj.location="${versionObj.location}" not implemented yet`;
}
return deployableVersionObj;
}catch(err){
logger.error(err, `${queryName} encountered an error when serving ${req_id}.`);
throw err;
Expand Down Expand Up @@ -141,10 +157,11 @@ const channelResolvers = {
throw err;
}
},
addChannelVersion: async(parent, { org_id, channel_uuid, name, type, content, description }, context)=>{
addChannelVersion: async(parent, { org_id, channel_uuid, name, type, content, file, description }, context)=>{
const { models, me, req_id, logger } = context;

const queryName = 'addChannelVersion';
logger.debug({req_id, user: whoIs(me), org_id, channel_uuid, name, type, description }, `${queryName} enter`);
logger.debug({req_id, user: whoIs(me), org_id, channel_uuid, name, type, description, file }, `${queryName} enter`);
await validAuth(me, org_id, ACTIONS.MANAGE, TYPES.CHANNEL, queryName, context);

// slightly modified code from /app/routes/v1/channelsStream.js. changed to use mongoose and graphql
Expand All @@ -160,6 +177,9 @@ const channelResolvers = {
if(!channel_uuid){
throw 'channel_uuid not specified';
}
if(!file && !content){
throw 'Please specify either file or content';
}

const channel = await models.Channel.findOne({ uuid: channel_uuid, org_id });
if(!channel){
Expand All @@ -175,49 +195,51 @@ const channelResolvers = {
throw new ValidationError(`The version name ${name} already exists`);
}

let fileStream = null;
if(file){
fileStream = (await file).createReadStream();
}
else{
fileStream = stream.Readable.from([ content ]);
}

const iv = crypto.randomBytes(16);
const ivText = iv.toString('base64');

const location = 'mongo';

// todo: enable s3
// let location, data;
//
// if (conf.s3.endpoint) {
// try {
// const resourceName = channel.name + '-' + version.name;
// const bucket = `${conf.s3.bucketPrefix}-${orgId.toLowerCase()}`;
// const s3Client = new S3ClientClass(conf);
// try {
// const exists = await s3Client.bucketExists(bucket);
// if (!exists) {
// logger.warn('bucket does not exist', { bucket });
// await s3Client.createBucket(bucket);
// }
// } catch (error) {
// logger.error('could not create bucket', { bucket: bucket });
// throw error;
// }
// const s3 = new AWS.S3(conf.s3);
// const key = Buffer.concat([Buffer.from(req.orgKey)], 32);
// const encrypt = crypto.createCipheriv(algorithm, key, iv);
// const pipe = req.pipe(encrypt);
// const params = {Bucket: bucket, Key: resourceName, Body: pipe};
// const upload = s3.upload( params );
// await upload.promise();
//
// data = `https://${conf.s3.endpoint}/${bucket}/${resourceName}`;
// location = 's3';
// } catch (error) {
// logger.error( 'S3 upload error', error );
// throw error;
// }
// } else {
// data = await encryptResource(req);
// location = 'mongo';
// }

const data = await encryptOrgData(orgKey, content);
let location = 'mongo';
let data = null;

if(conf.s3.endpoint){
const resourceName = `${channel.name}-${name}`;
const bucketName = `${conf.s3.bucketPrefix}-${org_id.toLowerCase()}`;

const s3Client = new S3ClientClass(conf);

await s3Client.ensureBucketExists(bucketName);

//data is now the s3 hostpath to the resource
const result = await s3Client.encryptAndUploadFile(bucketName, resourceName, fileStream, orgKey, iv);
data = result.url;

location = 's3';
}
else{
var buf = new WritableStreamBuffer();
await new Promise((resolve, reject)=>{
return stream.pipeline(
fileStream,
buf,
(err)=>{
if(err){
reject(err);
}
resolve(err);
}
);
});
const content = buf.getContents().toString('utf8');
data = await encryptOrgData(orgKey, content);
}

const deployableVersionObj = {
_id: UUID(),
Expand Down
3 changes: 2 additions & 1 deletion app/apollo/schema/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ const channelSchema = gql`
"""
Adds a yaml version to this channel
Requires either content:String or file:Upload
"""
addChannelVersion(org_id: String!, channel_uuid: String!, name: String!, type: String!, content: String!, description: String): AddChannelVersionReply!
addChannelVersion(org_id: String!, channel_uuid: String!, name: String!, type: String!, content: String, file: Upload, description: String): AddChannelVersionReply!
"""
Removes a channel
Expand Down
5 changes: 4 additions & 1 deletion app/conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ const conf = {
accessKeyId: process.env.S3_ACCESS_KEY_ID,
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
locationConstraint: process.env.S3_LOCATION_CONSTRAINT || 'us-standard',
bucketPrefix: process.env.S3_BUCKET_PREFIX || 'razee'
bucketPrefix: process.env.S3_BUCKET_PREFIX || 'razee',
s3ForcePathStyle: true,
signatureVersion: 'v4',
sslEnabled: !process.env.S3_DISABLE_SSL, //for local minio support
}
};

Expand Down
114 changes: 113 additions & 1 deletion app/s3/s3Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
const clone = require('clone');
const AWS = require('aws-sdk');
const crypto = require('crypto');
const _ = require('lodash');
const stream = require('stream');

const encryptionAlgorithm = 'aes-256-cbc';

module.exports = class S3Client {
constructor(options) {
Expand Down Expand Up @@ -104,7 +109,8 @@ module.exports = class S3Client {
const nop = {
error: () => {},
info: () => {},
debug: () => {}
debug: () => {},
warn: () => {},
};
const result = this._log || nop;
return result;
Expand All @@ -113,4 +119,110 @@ module.exports = class S3Client {
set log(logger) {
this._log = logger;
}

async ensureBucketExists(bucketName){
try {
const exists = await this.bucketExists(bucketName);
if(!exists){
this.log.warn('bucket does not exist', { bucketName });
await this.createBucket(bucketName);
}
}catch(err){
this.log.error('could not create bucket', { bucketName });
throw err;
}
}

async encryptAndUploadFile(bucketName, path, fileStream, encryptionKey, iv=null){
try {
const exists = await this.bucketExists(bucketName);
if(!exists){
this.log.warn('bucket does not exist', { bucketName });
await this.createBucket(bucketName);
}
}catch(err){
this.log.error('could not create bucket', { bucketName });
throw err;
}

const key = Buffer.concat([Buffer.from(encryptionKey)], 32);

if(!iv){
iv = crypto.randomBytes(16);
}
const ivText = iv.toString('base64');

const cipher = crypto.createCipheriv(encryptionAlgorithm, key, iv);

const awsStream = this._aws.upload({
Bucket: bucketName,
Key: path,
Body: fileStream.pipe(cipher),
});
await awsStream.promise();

const url = `${this._conf.endpoint.match(/^http/i) ? '' : 'https://'}${this._conf.endpoint}/${bucketName}/${path}`;
return {
url, ivText,
};
}

async getAndDecryptFile(bucketName, path, key, iv) {
return new Promise((resolve, reject) => {
try {
const { WritableStreamBuffer } = require('stream-buffers');

if (_.isString(iv)) {
iv = Buffer.from(iv, 'base64');
}
key = Buffer.concat([Buffer.from(key)], 32);

const awsStream = this.getObject(bucketName, path).createReadStream();
const decipher = crypto.createDecipheriv(encryptionAlgorithm, key, iv);

var buf = new WritableStreamBuffer();
stream.pipeline(
awsStream,
decipher,
buf,
(err) => {
if(err){
reject(err);
return;
}
try {
resolve(buf.getContents().toString('utf8'));
}
catch(err){
reject(err);
}
}
);
}
catch(err){
reject(err);
}
});
}
};

// ;((async()=>{
// var s3Client = new module.exports(require('../conf.js').conf);
// var bucketName = 'razee--k4tty77xnpmgjppfw';
// var path = 'blah';
// var content = 'this is teh content';
// var encryptionKey = 'orgApiKey-21fd8bfa-cc1d-43dd-988f-ddec98d72db7';
// var ivText = 'oRAApY8YmWQx5a98rUVkhg==';
// var iv = Buffer.from(ivText, 'base64');
//
// console.log(11111, bucketName, path, content, encryptionKey, ivText, iv);
//
// var out = await s3Client.encryptAndUploadFile(bucketName, path, content, encryptionKey, iv);
//
// console.log('uploaded', out);
//
// var out = await s3Client.getAndDecryptFile(bucketName, path, encryptionKey, iv);
//
// console.log('downloaded', out);
// })());

25 changes: 19 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
"prom-client": "^12.0.0",
"request": "^2.88.2",
"request-promise-native": "^1.0.8",
"stream-buffers": "^3.0.2",
"subscriptions-transport-ws": "^0.9.16",
"swagger-ui-express": "^4.1.4",
"uuid": "^7.0.3",
Expand Down

0 comments on commit 4a3ee04

Please sign in to comment.