Skip to content

Commit

Permalink
Merge pull request #6438 from jeniawhite/evgb-NSFS
Browse files Browse the repository at this point in the history
Refactor uploads and reads and add thread logging
  • Loading branch information
jeniawhite authored Apr 7, 2021
2 parents 2c68fb6 + ea146b3 commit 7a31c5d
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 55 deletions.
129 changes: 93 additions & 36 deletions src/native/fs/fs_napi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,40 @@
#include <vector>
#include <math.h>
#include <unistd.h>
#include <map>
#include <thread>

namespace noobaa
{

DBG_INIT(0);

const static std::map<std::string,int> flags_to_case = {
{"r",O_RDONLY},
{"rs",O_RDONLY | O_SYNC},
{"sr",O_RDONLY | O_SYNC},
{"r+",O_RDWR},
{"rs+",O_RDWR | O_SYNC},
{"sr+",O_RDWR | O_SYNC},
{"w",O_TRUNC | O_CREAT | O_WRONLY},
{"wx",O_TRUNC | O_CREAT | O_WRONLY | O_EXCL},
{"xw",O_TRUNC | O_CREAT | O_WRONLY | O_EXCL},
{"w+",O_TRUNC | O_CREAT | O_RDWR},
{"wx+",O_TRUNC | O_CREAT | O_RDWR | O_EXCL},
{"xw+",O_TRUNC | O_CREAT | O_RDWR | O_EXCL},
{"a",O_APPEND | O_CREAT | O_WRONLY},
{"ax",O_APPEND | O_CREAT | O_WRONLY | O_EXCL},
{"xa",O_APPEND | O_CREAT | O_WRONLY | O_EXCL},
{"as",O_APPEND | O_CREAT | O_WRONLY | O_SYNC},
{"sa",O_APPEND | O_CREAT | O_WRONLY | O_SYNC},
{"a+",O_APPEND | O_CREAT | O_RDWR},
{"ax+",O_APPEND | O_CREAT | O_RDWR | O_EXCL},
{"xa+",O_APPEND | O_CREAT | O_RDWR | O_EXCL},
{"as+",O_APPEND | O_CREAT | O_RDWR | O_SYNC},
{"sa+",O_APPEND | O_CREAT | O_RDWR | O_SYNC}
};


struct Entry {
std::string name;
ino_t ino;
Expand Down Expand Up @@ -66,10 +94,17 @@ struct FSWorker : public Napi::AsyncWorker
}
virtual void Work() = 0;
void Execute() {
#if !defined(_POSIX_C_SOURCE) || defined(_DARWIN_C_SOURCE)
pid_t tid = int(pthread_mach_thread_np(pthread_self()));
#else
// pid_t tid = syscall(__NR_gettid);
auto tid = std::this_thread::get_id();
#endif
DBG1("FS::FSWorker::Start Execute: " << _desc <<
" req_uid:" << _req_uid <<
" req_gid:" << _req_gid <<
" backend:" << _backend
" backend:" << _backend <<
" thread_id:" << tid
);
bool change_uid = orig_uid != _req_uid;
bool change_gid = orig_gid != _req_gid;
Expand Down Expand Up @@ -454,7 +489,7 @@ struct FileWrap : public Napi::ObjectWrap<FileWrap>
Napi::Function func = DefineClass(env, "File", {
InstanceMethod("close", &FileWrap::close),
InstanceMethod("read", &FileWrap::read),
// InstanceMethod("write", &FileWrap::write),
InstanceMethod("write", &FileWrap::write),
});
constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
Expand All @@ -471,7 +506,7 @@ struct FileWrap : public Napi::ObjectWrap<FileWrap>
}
Napi::Value close(const Napi::CallbackInfo& info);
Napi::Value read(const Napi::CallbackInfo& info);
// Napi::Value write(const Napi::CallbackInfo& info);
Napi::Value write(const Napi::CallbackInfo& info);
};

Napi::FunctionReference FileWrap::constructor;
Expand All @@ -480,20 +515,31 @@ struct FileOpen : public FSWorker
{
std::string _path;
int _fd;
FileOpen(const Napi::CallbackInfo& info) : FSWorker(info), _fd(0)
int _flags;
mode_t _mode;
FileOpen(const Napi::CallbackInfo& info)
: FSWorker(info)
, _fd(0)
, _flags(0)
, _mode(0666)
{
_path = info[1].As<Napi::String>();
// TODO - info[1] { mode, readonly }
if (info.Length() > 2 && !info[2].IsUndefined()) {
_flags = flags_to_case.at(info[2].As<Napi::String>());
}
if (info.Length() > 3 && !info[3].IsUndefined()) {
_mode = info[3].As<Napi::Number>().Uint32Value();
}
Begin(XSTR() << DVAL(_path));
}
virtual void Work()
{
_fd = open(_path.c_str(), O_RDONLY); // TODO mode
if (!_fd) SetSyscallError();
_fd = open(_path.c_str(), _flags, _mode);
if (_fd < 0) SetSyscallError();
}
virtual void OnOK()
{
DBG1("FS::DirOpen::OnOK: " << DVAL(_path));
DBG1("FS::FileOpen::OnOK: " << DVAL(_path));
Napi::Object res = FileWrap::constructor.New({});
FileWrap *w = FileWrap::Unwrap(res);
w->_path = _path;
Expand Down Expand Up @@ -534,6 +580,7 @@ struct FileRead : public FSWorker
, _offset(0)
, _len(0)
, _pos(0)
, _br(0)
{
_wrap = FileWrap::Unwrap(info.This().As<Napi::Object>());
auto buf = info[1].As<Napi::Buffer<uint8_t>>();
Expand Down Expand Up @@ -563,30 +610,40 @@ struct FileRead : public FSWorker
}
};

// struct FileWrite : public FSWorker
// {
// FileWrap *_wrap;
// const uint8_t* _buf;
// size_t _len;
// FileWrite(const Napi::CallbackInfo& info) : FSWorker(info)
// {
// _wrap = FileWrap::Unwrap(info.This().As<Napi::Object>());
// _buf = info[0].As<Napi::Buffer<uint8_t>>();
// _buf = info[1].As<Napi::Value<size_t>>();
// // TODO get buffer from info[0]
// }
// virtual void Work()
// {
// int fd = _wrap->_fd;
// std::string path = _wrap->_path;
// if (fd < 0) {
// SetError(XSTR() << "FS::FileWrite::Execute: ERROR not opened " << path);
// return;
// }

// // TODO - read(fd, buf...)
// }
// };

struct FileWrite : public FSWorker
{
FileWrap *_wrap;
const uint8_t* _buf;
size_t _len;
ssize_t _bw;
FileWrite(const Napi::CallbackInfo& info)
: FSWorker(info)
, _buf(0)
, _len(0)
{
_wrap = FileWrap::Unwrap(info.This().As<Napi::Object>());
auto buf = info[1].As<Napi::Buffer<uint8_t>>();
_buf = buf.Data();
_len = buf.Length();
}
virtual void Work()
{
int fd = _wrap->_fd;
std::string path = _wrap->_path;
if (fd < 0) {
SetError(XSTR() << "FS::FileWrite::Execute: ERROR not opened " << path);
return;
}
// TODO: Switch to pwrite when needed
_bw = write(fd, _buf, _len);
if (_bw < 0) {
SetSyscallError();
} else if ((size_t)_bw != _len) {
SetError(XSTR() << "FS::FileWrite::Execute: partial write error " << DVAL(_bw) << DVAL(_len));
}
}
};


Napi::Value FileWrap::close(const Napi::CallbackInfo& info)
Expand All @@ -599,10 +656,10 @@ Napi::Value FileWrap::read(const Napi::CallbackInfo& info)
return api<FileRead>(info);
}

// Napi::Value FileWrap::write(const Napi::CallbackInfo& info)
// {
// return api<FileWrite>(info);
// }
Napi::Value FileWrap::write(const Napi::CallbackInfo& info)
{
return api<FileWrite>(info);
}



Expand Down
60 changes: 41 additions & 19 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const fs = require('fs');
const path = require('path');
const util = require('util');
const mime = require('mime');
const events = require('events');
const { v4: uuidv4 } = require('uuid');

const config = require('../../config');
Expand Down Expand Up @@ -387,7 +386,7 @@ class NamespaceFS {
try {
await this._load_bucket(params);
const file_path = this._get_file_path(params);
file = await nb_native().fs.open(DEFAULT_FS_CONFIG, file_path); //, fs.constants.O_RDONLY);
file = await nb_native().fs.open(DEFAULT_FS_CONFIG, file_path);

const start = Number(params.start) || 0;
const end = isNaN(Number(params.end)) ? Infinity : Number(params.end);
Expand Down Expand Up @@ -496,15 +495,22 @@ class NamespaceFS {
}

async _upload_stream(source_stream, upload_path, write_options) {
return new Promise((resolve, reject) =>
source_stream
.once('error', reject)
.pipe(
fs.createWriteStream(upload_path, write_options)
.once('error', reject)
.once('finish', resolve)
)
);
let target_file;
try {
target_file = await nb_native().fs.open(DEFAULT_FS_CONFIG, upload_path, 'w');
for await (const data of source_stream) {
await target_file.write(DEFAULT_FS_CONFIG, data);
}
} catch (error) {
console.error('_upload_stream had error: ', error);
throw error;
} finally {
try {
if (target_file) await target_file.close(DEFAULT_FS_CONFIG);
} catch (err) {
console.warn('NamespaceFS: _upload_stream file close error', err);
}
}
}

//////////////////////
Expand Down Expand Up @@ -574,35 +580,51 @@ class NamespaceFS {
}

async complete_object_upload(params, object_sdk) {
let read_file;
let write_file;
try {
const { multiparts = [] } = params;
multiparts.sort((a, b) => a.num - b.num);
await this._load_multipart(params);
const file_path = this._get_file_path(params);
const upload_path = path.join(params.mpu_path, 'final');
const upload_stream = fs.createWriteStream(upload_path);
write_file = await nb_native().fs.open(DEFAULT_FS_CONFIG, upload_path, 'w');
for (const { num, etag } of multiparts) {
const part_path = path.join(params.mpu_path, `part-${num}`);
const part_stat = await nb_native().fs.stat(DEFAULT_FS_CONFIG, part_path);
if (etag !== this._get_etag(part_stat)) {
throw new Error('mismatch part etag: ' +
util.inspect({ num, etag, part_path, part_stat, params }));
}
for await (const data of fs.createReadStream(part_path, {
highWaterMark: config.NSFS_BUF_SIZE,
})) {
if (!upload_stream.write(data)) {
await events.once(upload_stream, 'drain');
}
read_file = await nb_native().fs.open(DEFAULT_FS_CONFIG, part_path);
const { buffer } = await buffers_pool.get_buffer(config.NSFS_BUF_SIZE);
let read_pos = 0;
for (;;) {
const bytesRead = await read_file.read(DEFAULT_FS_CONFIG, buffer, 0, config.NSFS_BUF_SIZE, read_pos);
if (!bytesRead) break;
read_pos += bytesRead;
const data = buffer.slice(0, bytesRead);
await write_file.write(DEFAULT_FS_CONFIG, data);
}
await read_file.close(DEFAULT_FS_CONFIG);
read_file = null;
}
upload_stream.end();
await write_file.close(DEFAULT_FS_CONFIG);
write_file = null;
const stat = await nb_native().fs.stat(DEFAULT_FS_CONFIG, upload_path);
await nb_native().fs.rename(DEFAULT_FS_CONFIG, upload_path, file_path);
await this._folder_delete(params.mpu_path);
return { etag: this._get_etag(stat) };
} catch (err) {
console.error(err);
throw this._translate_object_error_codes(err);
} finally {
try {
if (read_file) await read_file.close(DEFAULT_FS_CONFIG);
if (write_file) await write_file.close(DEFAULT_FS_CONFIG);
} catch (err) {
console.warn('NamespaceFS: complete_object_upload file close error', err);
}
}
}

Expand Down

0 comments on commit 7a31c5d

Please sign in to comment.