diff --git a/src/binding.cpp b/src/binding.cpp index ff1bb59..60dffbc 100644 --- a/src/binding.cpp +++ b/src/binding.cpp @@ -237,6 +237,7 @@ struct MergeBaton : carmen::noncopyable { std::string pbf2; std::string pbf3; std::string method; + std::string error; Nan::Persistent callback; }; @@ -251,156 +252,166 @@ void mergeQueue(uv_work_t* req) { std::map ids2; std::string merged; - protozero::pbf_writer writer(merged); - - // Store ids from 1 - protozero::pbf_reader pre1(pbf1); - while (pre1.next(CACHE_MESSAGE)) { - protozero::pbf_reader item = pre1.get_message(); - while (item.next(CACHE_ITEM)) { - ids1.emplace(item.get_uint64(), true); + try { + + protozero::pbf_writer writer(merged); + + // Store ids from 1 + protozero::pbf_reader pre1(pbf1); + while (pre1.next(CACHE_MESSAGE)) { + protozero::pbf_reader item = pre1.get_message(); + while (item.next(CACHE_ITEM)) { + ids1.emplace(item.get_uint64(), true); + } } - } - // Store ids from 2 - protozero::pbf_reader pre2(pbf2); - while (pre2.next(CACHE_MESSAGE)) { - protozero::pbf_reader item = pre2.get_message(); - while (item.next(CACHE_ITEM)) { - ids2.emplace(item.get_uint64(), true); + // Store ids from 2 + protozero::pbf_reader pre2(pbf2); + while (pre2.next(CACHE_MESSAGE)) { + protozero::pbf_reader item = pre2.get_message(); + while (item.next(CACHE_ITEM)) { + ids2.emplace(item.get_uint64(), true); + } } - } - // No delta writes from message1 - protozero::pbf_reader message1(pbf1); - while (message1.next(CACHE_MESSAGE)) { - protozero::pbf_writer item_writer(writer,1); - protozero::pbf_reader item = message1.get_message(); - while (item.next(CACHE_ITEM)) { - uint64_t key_id = item.get_uint64(); + // No delta writes from message1 + protozero::pbf_reader message1(pbf1); + while (message1.next(CACHE_MESSAGE)) { + protozero::pbf_writer item_writer(writer,1); + protozero::pbf_reader item = message1.get_message(); + while (item.next(CACHE_ITEM)) { + uint64_t key_id = item.get_uint64(); - // Skip this id if also in message 2 - if (ids2.find(key_id) != ids2.end()) break; + // Skip this id if also in message 2 + if (ids2.find(key_id) != ids2.end()) break; - item_writer.add_uint64(1,key_id); - item.next(); - protozero::packed_field_uint64 field{item_writer, 2}; - auto vals = item.get_packed_uint64(); - for (auto it = vals.first; it != vals.second; ++it) { - field.add_element(static_cast(*it)); + item_writer.add_uint64(1,key_id); + item.next(); + protozero::packed_field_uint64 field{item_writer, 2}; + auto vals = item.get_packed_uint64(); + for (auto it = vals.first; it != vals.second; ++it) { + field.add_element(static_cast(*it)); + } } } - } - // No delta writes from message2 - protozero::pbf_reader message2(pbf2); - while (message2.next(CACHE_MESSAGE)) { - protozero::pbf_writer item_writer(writer,1); - protozero::pbf_reader item = message2.get_message(); - while (item.next(CACHE_ITEM)) { - uint64_t key_id = item.get_uint64(); + // No delta writes from message2 + protozero::pbf_reader message2(pbf2); + while (message2.next(CACHE_MESSAGE)) { + protozero::pbf_writer item_writer(writer,1); + protozero::pbf_reader item = message2.get_message(); + while (item.next(CACHE_ITEM)) { + uint64_t key_id = item.get_uint64(); - // Skip this id if also in message 2 - if (ids1.find(key_id) != ids1.end()) break; + // Skip this id if also in message 2 + if (ids1.find(key_id) != ids1.end()) break; - item_writer.add_uint64(1,key_id); - item.next(); - protozero::packed_field_uint64 field{item_writer, 2}; - auto vals = item.get_packed_uint64(); - for (auto it = vals.first; it != vals.second; ++it) { - field.add_element(static_cast(*it)); + item_writer.add_uint64(1,key_id); + item.next(); + protozero::packed_field_uint64 field{item_writer, 2}; + auto vals = item.get_packed_uint64(); + for (auto it = vals.first; it != vals.second; ++it) { + field.add_element(static_cast(*it)); + } } } - } - // Delta writes for ids in both message1 and message2 - protozero::pbf_reader overlap1(pbf1); - while (overlap1.next(CACHE_MESSAGE)) { - protozero::pbf_writer item_writer(writer,1); - protozero::pbf_reader item = overlap1.get_message(); - while (item.next(CACHE_ITEM)) { - uint64_t key_id = item.get_uint64(); - - // Skip ids that are only in one or the other lists - if (ids1.find(key_id) == ids1.end() || ids2.find(key_id) == ids2.end()) break; - - item_writer.add_uint64(1,key_id); - - item.next(); - uint64_t lastval = 0; - Cache::intarray varr; - - // Add values from pbf1 - auto vals = item.get_packed_uint64(); - for (auto it = vals.first; it != vals.second; ++it) { - if (method == "freq") { - varr.emplace_back(*it); - break; - } else if (lastval == 0) { - lastval = *it; - varr.emplace_back(lastval); - } else { - lastval = lastval - *it; - varr.emplace_back(lastval); + // Delta writes for ids in both message1 and message2 + protozero::pbf_reader overlap1(pbf1); + while (overlap1.next(CACHE_MESSAGE)) { + protozero::pbf_writer item_writer(writer,1); + protozero::pbf_reader item = overlap1.get_message(); + while (item.next(CACHE_ITEM)) { + uint64_t key_id = item.get_uint64(); + + // Skip ids that are only in one or the other lists + if (ids1.find(key_id) == ids1.end() || ids2.find(key_id) == ids2.end()) break; + + item_writer.add_uint64(1,key_id); + + item.next(); + uint64_t lastval = 0; + Cache::intarray varr; + + // Add values from pbf1 + auto vals = item.get_packed_uint64(); + for (auto it = vals.first; it != vals.second; ++it) { + if (method == "freq") { + varr.emplace_back(*it); + break; + } else if (lastval == 0) { + lastval = *it; + varr.emplace_back(lastval); + } else { + lastval = lastval - *it; + varr.emplace_back(lastval); + } } - } - // Check pbf2 for this id and merge its items if found - protozero::pbf_reader overlap2(pbf2); - while (overlap2.next(CACHE_MESSAGE)) { - protozero::pbf_reader item2 = overlap2.get_message(); - while (item2.next(CACHE_ITEM)) { - uint64_t key_id2 = item2.get_uint64(); - if (key_id2 != key_id) break; - item2.next(); - lastval = 0; - auto vals2 = item2.get_packed_uint64(); - for (auto it = vals2.first; it != vals2.second; ++it) { - if (method == "freq") { - if (key_id2 == 1) { - varr[0] = varr[0] > *it ? varr[0] : *it; + // Check pbf2 for this id and merge its items if found + protozero::pbf_reader overlap2(pbf2); + while (overlap2.next(CACHE_MESSAGE)) { + protozero::pbf_reader item2 = overlap2.get_message(); + while (item2.next(CACHE_ITEM)) { + uint64_t key_id2 = item2.get_uint64(); + if (key_id2 != key_id) break; + item2.next(); + lastval = 0; + auto vals2 = item2.get_packed_uint64(); + for (auto it = vals2.first; it != vals2.second; ++it) { + if (method == "freq") { + if (key_id2 == 1) { + varr[0] = varr[0] > *it ? varr[0] : *it; + } else { + varr[0] = varr[0] + *it; + } + break; + } else if (lastval == 0) { + lastval = *it; + varr.emplace_back(lastval); } else { - varr[0] = varr[0] + *it; + lastval = lastval - *it; + varr.emplace_back(lastval); } - break; - } else if (lastval == 0) { - lastval = *it; - varr.emplace_back(lastval); - } else { - lastval = lastval - *it; - varr.emplace_back(lastval); } } } - } - // Sort for proper delta encoding - std::sort(varr.begin(), varr.end(), std::greater()); - - // Write varr to merged protobuf - protozero::packed_field_uint64 field{item_writer, 2}; - lastval = 0; - for (auto const& vitem : varr) { - if (lastval == 0) { - field.add_element(static_cast(vitem)); - } else { - field.add_element(static_cast(lastval - vitem)); + // Sort for proper delta encoding + std::sort(varr.begin(), varr.end(), std::greater()); + + // Write varr to merged protobuf + protozero::packed_field_uint64 field{item_writer, 2}; + lastval = 0; + for (auto const& vitem : varr) { + if (lastval == 0) { + field.add_element(static_cast(vitem)); + } else { + field.add_element(static_cast(lastval - vitem)); + } + lastval = vitem; } - lastval = vitem; } } - } - baton->pbf3 = merged; + baton->pbf3 = merged; + } catch (std::exception const& ex) { + baton->error = ex.what(); + } } void mergeAfter(uv_work_t* req) { Nan::HandleScope scope; MergeBaton *baton = static_cast(req->data); - std::string const& merged = baton->pbf3; - Local buf = Nan::CopyBuffer((char*)merged.data(), merged.size()).ToLocalChecked(); - Local argv[2] = { Nan::Null(), buf }; - Nan::MakeCallback(Nan::GetCurrentContext()->Global(), Nan::New(baton->callback), 2, argv); + if (!baton->error.empty()) { + v8::Local argv[1] = { Nan::Error(baton->error.c_str()) }; + Nan::MakeCallback(Nan::GetCurrentContext()->Global(), Nan::New(baton->callback), 1, argv); + } else { + std::string const& merged = baton->pbf3; + Local buf = Nan::CopyBuffer((char*)merged.data(), merged.size()).ToLocalChecked(); + Local argv[2] = { Nan::Null(), buf }; + Nan::MakeCallback(Nan::GetCurrentContext()->Global(), Nan::New(baton->callback), 2, argv); + } baton->callback.Reset(); delete baton; } diff --git a/test/merge.test.js b/test/merge.test.js index 82cc098..d13aae3 100644 --- a/test/merge.test.js +++ b/test/merge.test.js @@ -56,6 +56,9 @@ tape('#merge invalid pbf throws JS error', function(assert) { var cacheA = new Cache('a'); cacheA.merge(new Buffer("phony protobuf"), new Buffer("phony protobuf"), 'freq', function(err, merged) { assert.ok(err); + if (err) { + assert.ok(err.message.indexOf("unknown pbf field type exception") > -1); + } assert.end(); }) });