Skip to content
This repository has been archived by the owner on Oct 30, 2021. It is now read-only.

Commit

Permalink
correctly catch and rethrow expected protozero C++ exception
Browse files Browse the repository at this point in the history
  • Loading branch information
Dane Springmeyer committed Sep 28, 2016
1 parent 5434c57 commit fbadd77
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 119 deletions.
249 changes: 130 additions & 119 deletions src/binding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ struct MergeBaton : carmen::noncopyable {
std::string pbf2;
std::string pbf3;
std::string method;
std::string error;
Nan::Persistent<v8::Function> callback;
};

Expand All @@ -251,156 +252,166 @@ void mergeQueue(uv_work_t* req) {
std::map<uint64_t,bool> ids2;

std::string merged;
protozero::pbf_writer writer(merged);

// Store ids from 1
protozero::pbf_reader pre1(pbf1);
while (pre1.next(CACHE_MESSAGE)) {
protozero::pbf_reader item = pre1.get_message();
while (item.next(CACHE_ITEM)) {
ids1.emplace(item.get_uint64(), true);
try {

protozero::pbf_writer writer(merged);

// Store ids from 1
protozero::pbf_reader pre1(pbf1);
while (pre1.next(CACHE_MESSAGE)) {
protozero::pbf_reader item = pre1.get_message();
while (item.next(CACHE_ITEM)) {
ids1.emplace(item.get_uint64(), true);
}
}
}

// Store ids from 2
protozero::pbf_reader pre2(pbf2);
while (pre2.next(CACHE_MESSAGE)) {
protozero::pbf_reader item = pre2.get_message();
while (item.next(CACHE_ITEM)) {
ids2.emplace(item.get_uint64(), true);
// Store ids from 2
protozero::pbf_reader pre2(pbf2);
while (pre2.next(CACHE_MESSAGE)) {
protozero::pbf_reader item = pre2.get_message();
while (item.next(CACHE_ITEM)) {
ids2.emplace(item.get_uint64(), true);
}
}
}

// No delta writes from message1
protozero::pbf_reader message1(pbf1);
while (message1.next(CACHE_MESSAGE)) {
protozero::pbf_writer item_writer(writer,1);
protozero::pbf_reader item = message1.get_message();
while (item.next(CACHE_ITEM)) {
uint64_t key_id = item.get_uint64();
// No delta writes from message1
protozero::pbf_reader message1(pbf1);
while (message1.next(CACHE_MESSAGE)) {
protozero::pbf_writer item_writer(writer,1);
protozero::pbf_reader item = message1.get_message();
while (item.next(CACHE_ITEM)) {
uint64_t key_id = item.get_uint64();

// Skip this id if also in message 2
if (ids2.find(key_id) != ids2.end()) break;
// Skip this id if also in message 2
if (ids2.find(key_id) != ids2.end()) break;

item_writer.add_uint64(1,key_id);
item.next();
protozero::packed_field_uint64 field{item_writer, 2};
auto vals = item.get_packed_uint64();
for (auto it = vals.first; it != vals.second; ++it) {
field.add_element(static_cast<uint64_t>(*it));
item_writer.add_uint64(1,key_id);
item.next();
protozero::packed_field_uint64 field{item_writer, 2};
auto vals = item.get_packed_uint64();
for (auto it = vals.first; it != vals.second; ++it) {
field.add_element(static_cast<uint64_t>(*it));
}
}
}
}

// No delta writes from message2
protozero::pbf_reader message2(pbf2);
while (message2.next(CACHE_MESSAGE)) {
protozero::pbf_writer item_writer(writer,1);
protozero::pbf_reader item = message2.get_message();
while (item.next(CACHE_ITEM)) {
uint64_t key_id = item.get_uint64();
// No delta writes from message2
protozero::pbf_reader message2(pbf2);
while (message2.next(CACHE_MESSAGE)) {
protozero::pbf_writer item_writer(writer,1);
protozero::pbf_reader item = message2.get_message();
while (item.next(CACHE_ITEM)) {
uint64_t key_id = item.get_uint64();

// Skip this id if also in message 2
if (ids1.find(key_id) != ids1.end()) break;
// Skip this id if also in message 2
if (ids1.find(key_id) != ids1.end()) break;

item_writer.add_uint64(1,key_id);
item.next();
protozero::packed_field_uint64 field{item_writer, 2};
auto vals = item.get_packed_uint64();
for (auto it = vals.first; it != vals.second; ++it) {
field.add_element(static_cast<uint64_t>(*it));
item_writer.add_uint64(1,key_id);
item.next();
protozero::packed_field_uint64 field{item_writer, 2};
auto vals = item.get_packed_uint64();
for (auto it = vals.first; it != vals.second; ++it) {
field.add_element(static_cast<uint64_t>(*it));
}
}
}
}

// Delta writes for ids in both message1 and message2
protozero::pbf_reader overlap1(pbf1);
while (overlap1.next(CACHE_MESSAGE)) {
protozero::pbf_writer item_writer(writer,1);
protozero::pbf_reader item = overlap1.get_message();
while (item.next(CACHE_ITEM)) {
uint64_t key_id = item.get_uint64();

// Skip ids that are only in one or the other lists
if (ids1.find(key_id) == ids1.end() || ids2.find(key_id) == ids2.end()) break;

item_writer.add_uint64(1,key_id);

item.next();
uint64_t lastval = 0;
Cache::intarray varr;

// Add values from pbf1
auto vals = item.get_packed_uint64();
for (auto it = vals.first; it != vals.second; ++it) {
if (method == "freq") {
varr.emplace_back(*it);
break;
} else if (lastval == 0) {
lastval = *it;
varr.emplace_back(lastval);
} else {
lastval = lastval - *it;
varr.emplace_back(lastval);
// Delta writes for ids in both message1 and message2
protozero::pbf_reader overlap1(pbf1);
while (overlap1.next(CACHE_MESSAGE)) {
protozero::pbf_writer item_writer(writer,1);
protozero::pbf_reader item = overlap1.get_message();
while (item.next(CACHE_ITEM)) {
uint64_t key_id = item.get_uint64();

// Skip ids that are only in one or the other lists
if (ids1.find(key_id) == ids1.end() || ids2.find(key_id) == ids2.end()) break;

item_writer.add_uint64(1,key_id);

item.next();
uint64_t lastval = 0;
Cache::intarray varr;

// Add values from pbf1
auto vals = item.get_packed_uint64();
for (auto it = vals.first; it != vals.second; ++it) {
if (method == "freq") {
varr.emplace_back(*it);
break;
} else if (lastval == 0) {
lastval = *it;
varr.emplace_back(lastval);
} else {
lastval = lastval - *it;
varr.emplace_back(lastval);
}
}
}

// Check pbf2 for this id and merge its items if found
protozero::pbf_reader overlap2(pbf2);
while (overlap2.next(CACHE_MESSAGE)) {
protozero::pbf_reader item2 = overlap2.get_message();
while (item2.next(CACHE_ITEM)) {
uint64_t key_id2 = item2.get_uint64();
if (key_id2 != key_id) break;
item2.next();
lastval = 0;
auto vals2 = item2.get_packed_uint64();
for (auto it = vals2.first; it != vals2.second; ++it) {
if (method == "freq") {
if (key_id2 == 1) {
varr[0] = varr[0] > *it ? varr[0] : *it;
// Check pbf2 for this id and merge its items if found
protozero::pbf_reader overlap2(pbf2);
while (overlap2.next(CACHE_MESSAGE)) {
protozero::pbf_reader item2 = overlap2.get_message();
while (item2.next(CACHE_ITEM)) {
uint64_t key_id2 = item2.get_uint64();
if (key_id2 != key_id) break;
item2.next();
lastval = 0;
auto vals2 = item2.get_packed_uint64();
for (auto it = vals2.first; it != vals2.second; ++it) {
if (method == "freq") {
if (key_id2 == 1) {
varr[0] = varr[0] > *it ? varr[0] : *it;
} else {
varr[0] = varr[0] + *it;
}
break;
} else if (lastval == 0) {
lastval = *it;
varr.emplace_back(lastval);
} else {
varr[0] = varr[0] + *it;
lastval = lastval - *it;
varr.emplace_back(lastval);
}
break;
} else if (lastval == 0) {
lastval = *it;
varr.emplace_back(lastval);
} else {
lastval = lastval - *it;
varr.emplace_back(lastval);
}
}
}
}

// Sort for proper delta encoding
std::sort(varr.begin(), varr.end(), std::greater<uint64_t>());

// Write varr to merged protobuf
protozero::packed_field_uint64 field{item_writer, 2};
lastval = 0;
for (auto const& vitem : varr) {
if (lastval == 0) {
field.add_element(static_cast<uint64_t>(vitem));
} else {
field.add_element(static_cast<uint64_t>(lastval - vitem));
// Sort for proper delta encoding
std::sort(varr.begin(), varr.end(), std::greater<uint64_t>());

// Write varr to merged protobuf
protozero::packed_field_uint64 field{item_writer, 2};
lastval = 0;
for (auto const& vitem : varr) {
if (lastval == 0) {
field.add_element(static_cast<uint64_t>(vitem));
} else {
field.add_element(static_cast<uint64_t>(lastval - vitem));
}
lastval = vitem;
}
lastval = vitem;
}
}
}

baton->pbf3 = merged;
baton->pbf3 = merged;
} catch (std::exception const& ex) {
baton->error = ex.what();
}
}

void mergeAfter(uv_work_t* req) {
Nan::HandleScope scope;
MergeBaton *baton = static_cast<MergeBaton *>(req->data);
std::string const& merged = baton->pbf3;
Local<Object> buf = Nan::CopyBuffer((char*)merged.data(), merged.size()).ToLocalChecked();
Local<Value> argv[2] = { Nan::Null(), buf };
Nan::MakeCallback(Nan::GetCurrentContext()->Global(), Nan::New(baton->callback), 2, argv);
if (!baton->error.empty()) {
v8::Local<v8::Value> argv[1] = { Nan::Error(baton->error.c_str()) };
Nan::MakeCallback(Nan::GetCurrentContext()->Global(), Nan::New(baton->callback), 1, argv);
} else {
std::string const& merged = baton->pbf3;
Local<Object> buf = Nan::CopyBuffer((char*)merged.data(), merged.size()).ToLocalChecked();
Local<Value> argv[2] = { Nan::Null(), buf };
Nan::MakeCallback(Nan::GetCurrentContext()->Global(), Nan::New(baton->callback), 2, argv);
}
baton->callback.Reset();
delete baton;
}
Expand Down
3 changes: 3 additions & 0 deletions test/merge.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ tape('#merge invalid pbf throws JS error', function(assert) {
var cacheA = new Cache('a');
cacheA.merge(new Buffer("phony protobuf"), new Buffer("phony protobuf"), 'freq', function(err, merged) {
assert.ok(err);
if (err) {
assert.ok(err.message.indexOf("unknown pbf field type exception") > -1);
}
assert.end();
})
});
Expand Down

0 comments on commit fbadd77

Please sign in to comment.