Skip to content

Commit

Permalink
add serialization version in HNSW/sorted neighborhood after HNSW cons…
Browse files Browse the repository at this point in the history
…truction
  • Loading branch information
rofuyu committed Oct 13, 2021
1 parent f403266 commit 3270534
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 54 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ test: libpecos
# Clean
clean:
rm ${VFLAG} -rf ./build ./dist ./*.egg-info
rm -f ./pecos/core/*.so
rm -f ./pecos/core/*.so .coverage*
python3 -Bc "import pathlib; [p.unlink() for p in pathlib.Path('.').rglob('*.py[co]')]"
python3 -Bc "import pathlib; [p.rmdir() for p in pathlib.Path('.').rglob('__pycache__')]"
136 changes: 89 additions & 47 deletions pecos/core/ann/hnsw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ namespace ann {
// destructor
~HNSW() {}

void load_config(const std::string& filepath) const {
static nlohmann::json load_config(const std::string& filepath) {
std::ifstream loadfile(filepath);
std::string json_str;
if(loadfile.is_open()) {
Expand All @@ -381,11 +381,13 @@ namespace ann {
if(hnsw_t_cur != hnsw_t_inp) {
throw std::invalid_argument("Inconsistent HNSW_T: hnsw_t_cur = " + hnsw_t_cur + " hnsw_t_cur = " + hnsw_t_inp);
}
return j_param;
}

void save_config(const std::string& filepath) const {
nlohmann::json j_params = {
{"hnsw_t", pecos::type_util::full_name<HNSW>()},
{"version", "v1.0"},
{"train_params", {
{"num_node", this->num_node},
{"maxM", this->maxM},
Expand Down Expand Up @@ -426,23 +428,28 @@ namespace ann {
}

void load(const std::string& model_dir) {
load_config(model_dir + "/config.json");
auto config = load_config(model_dir + "/config.json");
std::string version = config.find("version") != config.end() ? config["version"] : "not found";
std::string index_path = model_dir + "/index.bin";
FILE *fp = fopen(index_path.c_str(), "rb");
pecos::file_util::fget_multiple<index_type>(&num_node, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM0, 1, fp);
pecos::file_util::fget_multiple<index_type>(&efC, 1, fp);
pecos::file_util::fget_multiple<index_type>(&max_level, 1, fp);
pecos::file_util::fget_multiple<index_type>(&init_node, 1, fp);
graph_l0.load(fp);
graph_l1.load(fp);
if(version == "v1.0") {
pecos::file_util::fget_multiple<index_type>(&num_node, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM0, 1, fp);
pecos::file_util::fget_multiple<index_type>(&efC, 1, fp);
pecos::file_util::fget_multiple<index_type>(&max_level, 1, fp);
pecos::file_util::fget_multiple<index_type>(&init_node, 1, fp);
graph_l0.load(fp);
graph_l1.load(fp);
} else {
throw std::runtime_error("Unable to load this binary with version = " + version);
}
fclose(fp);
}

// Algorithm 4 of HNSW paper
void get_neighbors_heuristic(max_heap_t &top_candidates, const index_type M) {
if (top_candidates.size() < M) { return; }
if(top_candidates.size() < M) { return; }

min_heap_t queue_closest;
std::vector<pair_t> return_list;
Expand All @@ -452,41 +459,41 @@ namespace ann {
}

while (queue_closest.size()) {
if (return_list.size() >= M) {
if(return_list.size() >= M) {
break;
}
auto curent_pair = queue_closest.top();
dist_t dist_to_query = curent_pair.dist;
queue_closest.pop();
bool good = true;

for (auto& second_pair : return_list) {
for(auto& second_pair : return_list) {
dist_t curdist = feat_vec_t::distance(
graph_l0.get_node_feat(second_pair.node_id),
graph_l0.get_node_feat(curent_pair.node_id)
);
if (curdist < dist_to_query) {
if(curdist < dist_to_query) {
good = false;
break;
}
}
if (good) {
if(good) {
return_list.push_back(curent_pair);
}
}

for (auto& curent_pair : return_list) {
for(auto& curent_pair : return_list) {
top_candidates.emplace(curent_pair);
}
}

// line 10-17, Algorithm 1 of HNSW paper
// it is the caller's responsibility to make sure top_candidates are available in the graph of this level.
template<bool lock_free=true>
index_type mutually_connect(index_type query_id, max_heap_t &top_candidates, index_type level, std::vector<std::mutex>* mtx_nodes=nullptr) {
index_type mutually_connect(index_type src_node_id, max_heap_t &top_candidates, index_type level, std::vector<std::mutex>* mtx_nodes=nullptr) {
index_type Mcurmax = level ? this->maxM : this->maxM0;
get_neighbors_heuristic(top_candidates, this->maxM);
if (top_candidates.size() > this->maxM) {
if(top_candidates.size() > this->maxM) {
throw std::runtime_error("Should be not be more than M_ candidates returned by the heuristic");
}

Expand All @@ -498,7 +505,7 @@ namespace ann {
}

GraphBase *G;
if (level == 0) {
if(level == 0) {
G = &graph_l0;
} else {
G = &graph_l1;
Expand All @@ -512,12 +519,12 @@ namespace ann {

auto neighbors = G->get_neighborhood(src, level);

if (neighbors.degree() > Mcurmax)
if(neighbors.degree() > Mcurmax)
throw std::runtime_error("Bad value of size of neighbors for this src node");
if (src == dst)
if(src == dst)
throw std::runtime_error("Trying to connect an element to itself");

if (neighbors.degree() < Mcurmax) {
if(neighbors.degree() < Mcurmax) {
neighbors.push_back(dst);
} else {
// finding the "weakest" element to replace it with the new one
Expand Down Expand Up @@ -551,9 +558,9 @@ namespace ann {
}
};

for (auto& dst : selected_neighbors) {
add_link(query_id, dst);
add_link(dst, query_id);
for(auto& dst : selected_neighbors) {
add_link(src_node_id, dst);
add_link(dst, src_node_id);
}

index_type next_closest_entry_point = selected_neighbors.back();
Expand Down Expand Up @@ -600,7 +607,7 @@ namespace ann {
auto max_level = hnsw.max_level;
auto curr_node = hnsw.init_node;

const feat_vec_t& query_feat_ptr = graph_l0.get_node_feat(query_id);
const feat_vec_t& query_feat = graph_l0.get_node_feat(query_id);

bool is_first_node = (query_id == 0);
if(is_first_node) {
Expand All @@ -610,7 +617,7 @@ namespace ann {
// find entrypoint with efS = 1 from level = local max_level to 1.
if(query_level < max_level) {
dist_t curr_dist = feat_vec_t::distance(
query_feat_ptr,
query_feat,
graph_l0.get_node_feat(curr_node)
);

Expand All @@ -622,10 +629,10 @@ namespace ann {
auto neighbors = graph_l1.get_neighborhood(curr_node, level);
for(auto& next_node : neighbors) {
dist_t next_dist = feat_vec_t::distance(
query_feat_ptr,
query_feat,
graph_l0.get_node_feat(next_node)
);
if (next_dist < curr_dist) {
if(next_dist < curr_dist) {
curr_dist = next_dist;
curr_node = next_node;
changed = true;
Expand All @@ -635,16 +642,16 @@ namespace ann {
}
}
if(lock_free) {
for (auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<true>(query_feat_ptr, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
for(auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<true>(query_feat, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
curr_node = mutually_connect<true>(query_id, top_candidates, level, &ws.mtx_nodes);
if (level == 0) { break; }
if(level == 0) { break; }
}
} else {
for (auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<false>(query_feat_ptr, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
for(auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<false>(query_feat, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
curr_node = mutually_connect<false>(query_id, top_candidates, level, &ws.mtx_nodes);
if (level == 0) { break; }
if(level == 0) { break; }
}
}

Expand Down Expand Up @@ -693,11 +700,46 @@ namespace ann {

bool lock_free = (threads == 1);
#pragma omp parallel for schedule(dynamic, 1)
for (index_type query_id = 0; query_id < num_node; query_id++) {
for(index_type node_id = 0; node_id < num_node; node_id++) {
int thread_id = omp_get_thread_num();
add_point(query_id, ws, thread_id, lock_free);
add_point(node_id, ws, thread_id, lock_free);
}

auto sort_neighbors_for_node = [&](index_type node_id, workspace_t& ws, int thread_id) {
auto& hnsw = ws.hnsw;
auto& graph_l0 = hnsw.graph_l0;
auto& graph_l1 = hnsw.graph_l1;
auto& queue = ws.searchers[thread_id].cand_queue;

const auto &src = graph_l0.get_node_feat(node_id);
for(index_type level = 0; level <= ws.node2level[node_id]; level++) {
GraphBase *G;
if(level == 0) {
G = &graph_l0;
} else {
G = &graph_l1;
}
auto neighbors = G->get_neighborhood(node_id, level);
if(neighbors.degree() == 0) {
return;
}
queue.clear();
for(index_type j = 0; j < neighbors.degree(); j++) {
const auto& dst = graph_l0.get_node_feat(neighbors[j]);
queue.emplace_back(feat_vec_t::distance(src, dst), neighbors[j]);
}
std::sort(queue.begin(), queue.end());
for(index_type j = 0; j < neighbors.degree(); j++) {
neighbors[j] = queue[j].node_id;
}
}
};

#pragma omp parallel for schedule(dynamic, 1)
for(index_type node_id = 0; node_id < num_node; node_id++) {
int thread_id = omp_get_thread_num();
sort_neighbors_for_node(node_id, ws, thread_id);
}
}

// Algorithm 2 of HNSW paper
Expand All @@ -723,7 +765,7 @@ namespace ann {
searcher.mark_visited(init_node);

const GraphBase *G;
if (level == 0) {
if(level == 0) {
G = &graph_l0;
} else {
G = &graph_l1;
Expand All @@ -732,7 +774,7 @@ namespace ann {
// Best First Search loop
while (!cand_queue.empty()) {
pair_t cand_pair = cand_queue.top();
if (cand_pair.dist > topk_ub_dist) {
if(cand_pair.dist > topk_ub_dist) {
break;
}
cand_queue.pop();
Expand All @@ -745,24 +787,24 @@ namespace ann {
// visiting neighbors of candidate node
const auto neighbors = G->get_neighborhood(cand_node, level);
graph_l0.prefetch_node_feat(neighbors[0]);
for (index_type j = 0; j < neighbors.degree(); j++) {
for(index_type j = 0; j < neighbors.degree(); j++) {
graph_l0.prefetch_node_feat(neighbors[j + 1]);
auto next_node = neighbors[j];
if (!searcher.is_visited(next_node)) {
if(!searcher.is_visited(next_node)) {
searcher.mark_visited(next_node);
dist_t next_lb_dist;
next_lb_dist = feat_vec_t::distance(
query,
graph_l0.get_node_feat(next_node)
);
if (topk_queue.size() < efS || next_lb_dist < topk_ub_dist) {
if(topk_queue.size() < efS || next_lb_dist < topk_ub_dist) {
cand_queue.emplace(next_lb_dist, next_node);
graph_l0.prefetch_node_feat(cand_queue.top().node_id);
topk_queue.emplace(next_lb_dist, next_node);
if (topk_queue.size() > efS) {
if(topk_queue.size() > efS) {
topk_queue.pop();
}
if (!topk_queue.empty()) {
if(!topk_queue.empty()) {
topk_ub_dist = topk_queue.top().dist;
}
}
Expand All @@ -786,20 +828,20 @@ namespace ann {
query,
G0.get_node_feat(init_node)
);
for (index_type curr_level = this->max_level; curr_level >= 1; curr_level--) {
for(index_type curr_level = this->max_level; curr_level >= 1; curr_level--) {
bool changed = true;
while (changed) {
changed = false;
const auto neighbors = G1.get_neighborhood(curr_node, curr_level);
graph_l0.prefetch_node_feat(neighbors[0]);
for (index_type j = 0; j < neighbors.degree(); j++) {
for(index_type j = 0; j < neighbors.degree(); j++) {
graph_l0.prefetch_node_feat(neighbors[j + 1]);
auto next_node = neighbors[j];
dist_t next_dist = feat_vec_t::distance(
query,
G0.get_node_feat(next_node)
);
if (next_dist < curr_dist) {
if(next_dist < curr_dist) {
curr_dist = next_dist;
curr_node = next_node;
changed = true;
Expand Down
8 changes: 5 additions & 3 deletions pecos/core/utils/tfidf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class Tokenizer {

Tokenizer(const string& load_dir) { load(load_dir); }

void load_config(const string& filepath) {
static nlohmann::json load_config(const string& filepath) {
std::ifstream loadfile(filepath);
string json_str;
if(loadfile.is_open()) {
Expand All @@ -333,7 +333,7 @@ class Tokenizer {
throw std::runtime_error("Unable to open config file at " + filepath);
}
auto j_param = nlohmann::json::parse(json_str);
tok_type = j_param["token_type"];
return j_param;
}

void save_config(const string& filepath) const {
Expand Down Expand Up @@ -370,7 +370,9 @@ class Tokenizer {
}

void load(const string& load_dir) {
load_config(load_dir + "/config.json");
auto config = load_config(load_dir + "/config.json");
this->tok_type = config["token_type"];

std::ifstream loadfile(load_dir + "/vocab.txt");
if(loadfile.is_open()) {
string line;
Expand Down
2 changes: 1 addition & 1 deletion test/pecos/apps/text2text/test_text2text.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def assert_json_string(str_a, str_b):
cmd += ["-q {}".format(item_file)]
cmd += ["-m {}".format(model_folder)]
cmd += ["--max-leaf-size {}".format(10)]
cmd += ['--vectorizer-config-json \{\\"type\\":\\"sklearntfidf\\",\\"kwargs\\":\{\}\}']
cmd += ["""--vectorizer-config-json '{"type":"sklearntfidf","kwargs":{}}'"""]
print(" ".join(cmd))
process = subprocess.run(
shlex.split(" ".join(cmd)), stdout=subprocess.PIPE, stderr=subprocess.PIPE
Expand Down
3 changes: 2 additions & 1 deletion test/tst-data/ann/hnsw-model-dense/c_model/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"hnsw_t": "pecos::ann::HNSW<float, pecos::ann::FeatVecDenseIPSimd<float>>",
"version": "v1.0",
"train_params": {
"efC": 100,
"init_node": 50,
Expand All @@ -8,4 +9,4 @@
"max_level": 1,
"num_node": 90
}
}
}
3 changes: 2 additions & 1 deletion test/tst-data/ann/hnsw-model-sparse/c_model/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"hnsw_t": "pecos::ann::HNSW<float, pecos::ann::FeatVecSparseIPSimd<uint32_t, float>>",
"version": "v1.0",
"train_params": {
"efC": 100,
"init_node": 50,
Expand All @@ -8,4 +9,4 @@
"max_level": 1,
"num_node": 90
}
}
}

0 comments on commit 3270534

Please sign in to comment.