Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add serialization version in HNSW/sorted neighborhood after HNSW construction #77

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ test: libpecos
# Clean
clean:
rm ${VFLAG} -rf ./build ./dist ./*.egg-info
rm -f ./pecos/core/*.so
rm -f ./pecos/core/*.so .coverage*
python3 -Bc "import pathlib; [p.unlink() for p in pathlib.Path('.').rglob('*.py[co]')]"
python3 -Bc "import pathlib; [p.rmdir() for p in pathlib.Path('.').rglob('__pycache__')]"
136 changes: 89 additions & 47 deletions pecos/core/ann/hnsw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ namespace ann {
// destructor
~HNSW() {}

void load_config(const std::string& filepath) const {
static nlohmann::json load_config(const std::string& filepath) {
std::ifstream loadfile(filepath);
std::string json_str;
if(loadfile.is_open()) {
Expand All @@ -381,11 +381,13 @@ namespace ann {
if(hnsw_t_cur != hnsw_t_inp) {
throw std::invalid_argument("Inconsistent HNSW_T: hnsw_t_cur = " + hnsw_t_cur + " hnsw_t_cur = " + hnsw_t_inp);
}
return j_param;
}

void save_config(const std::string& filepath) const {
nlohmann::json j_params = {
{"hnsw_t", pecos::type_util::full_name<HNSW>()},
{"version", "v1.0"},
{"train_params", {
{"num_node", this->num_node},
{"maxM", this->maxM},
Expand Down Expand Up @@ -426,23 +428,28 @@ namespace ann {
}

void load(const std::string& model_dir) {
load_config(model_dir + "/config.json");
auto config = load_config(model_dir + "/config.json");
std::string version = config.find("version") != config.end() ? config["version"] : "not found";
std::string index_path = model_dir + "/index.bin";
FILE *fp = fopen(index_path.c_str(), "rb");
pecos::file_util::fget_multiple<index_type>(&num_node, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM0, 1, fp);
pecos::file_util::fget_multiple<index_type>(&efC, 1, fp);
pecos::file_util::fget_multiple<index_type>(&max_level, 1, fp);
pecos::file_util::fget_multiple<index_type>(&init_node, 1, fp);
graph_l0.load(fp);
graph_l1.load(fp);
if(version == "v1.0") {
pecos::file_util::fget_multiple<index_type>(&num_node, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM, 1, fp);
pecos::file_util::fget_multiple<index_type>(&maxM0, 1, fp);
pecos::file_util::fget_multiple<index_type>(&efC, 1, fp);
pecos::file_util::fget_multiple<index_type>(&max_level, 1, fp);
pecos::file_util::fget_multiple<index_type>(&init_node, 1, fp);
graph_l0.load(fp);
graph_l1.load(fp);
} else {
throw std::runtime_error("Unable to load this binary with version = " + version);
}
fclose(fp);
}

// Algorithm 4 of HNSW paper
void get_neighbors_heuristic(max_heap_t &top_candidates, const index_type M) {
if (top_candidates.size() < M) { return; }
if(top_candidates.size() < M) { return; }

min_heap_t queue_closest;
std::vector<pair_t> return_list;
Expand All @@ -452,41 +459,41 @@ namespace ann {
}

while (queue_closest.size()) {
if (return_list.size() >= M) {
if(return_list.size() >= M) {
break;
}
auto curent_pair = queue_closest.top();
dist_t dist_to_query = curent_pair.dist;
queue_closest.pop();
bool good = true;

for (auto& second_pair : return_list) {
for(auto& second_pair : return_list) {
dist_t curdist = feat_vec_t::distance(
graph_l0.get_node_feat(second_pair.node_id),
graph_l0.get_node_feat(curent_pair.node_id)
);
if (curdist < dist_to_query) {
if(curdist < dist_to_query) {
good = false;
break;
}
}
if (good) {
if(good) {
return_list.push_back(curent_pair);
}
}

for (auto& curent_pair : return_list) {
for(auto& curent_pair : return_list) {
top_candidates.emplace(curent_pair);
}
}

// line 10-17, Algorithm 1 of HNSW paper
// it is the caller's responsibility to make sure top_candidates are available in the graph of this level.
template<bool lock_free=true>
index_type mutually_connect(index_type query_id, max_heap_t &top_candidates, index_type level, std::vector<std::mutex>* mtx_nodes=nullptr) {
index_type mutually_connect(index_type src_node_id, max_heap_t &top_candidates, index_type level, std::vector<std::mutex>* mtx_nodes=nullptr) {
index_type Mcurmax = level ? this->maxM : this->maxM0;
get_neighbors_heuristic(top_candidates, this->maxM);
if (top_candidates.size() > this->maxM) {
if(top_candidates.size() > this->maxM) {
throw std::runtime_error("Should be not be more than M_ candidates returned by the heuristic");
}

Expand All @@ -498,7 +505,7 @@ namespace ann {
}

GraphBase *G;
if (level == 0) {
if(level == 0) {
G = &graph_l0;
} else {
G = &graph_l1;
Expand All @@ -512,12 +519,12 @@ namespace ann {

auto neighbors = G->get_neighborhood(src, level);

if (neighbors.degree() > Mcurmax)
if(neighbors.degree() > Mcurmax)
throw std::runtime_error("Bad value of size of neighbors for this src node");
if (src == dst)
if(src == dst)
throw std::runtime_error("Trying to connect an element to itself");

if (neighbors.degree() < Mcurmax) {
if(neighbors.degree() < Mcurmax) {
neighbors.push_back(dst);
} else {
// finding the "weakest" element to replace it with the new one
Expand Down Expand Up @@ -551,9 +558,9 @@ namespace ann {
}
};

for (auto& dst : selected_neighbors) {
add_link(query_id, dst);
add_link(dst, query_id);
for(auto& dst : selected_neighbors) {
add_link(src_node_id, dst);
add_link(dst, src_node_id);
}

index_type next_closest_entry_point = selected_neighbors.back();
Expand Down Expand Up @@ -600,7 +607,7 @@ namespace ann {
auto max_level = hnsw.max_level;
auto curr_node = hnsw.init_node;

const feat_vec_t& query_feat_ptr = graph_l0.get_node_feat(query_id);
const feat_vec_t& query_feat = graph_l0.get_node_feat(query_id);

bool is_first_node = (query_id == 0);
if(is_first_node) {
Expand All @@ -610,7 +617,7 @@ namespace ann {
// find entrypoint with efS = 1 from level = local max_level to 1.
if(query_level < max_level) {
dist_t curr_dist = feat_vec_t::distance(
query_feat_ptr,
query_feat,
graph_l0.get_node_feat(curr_node)
);

Expand All @@ -622,10 +629,10 @@ namespace ann {
auto neighbors = graph_l1.get_neighborhood(curr_node, level);
for(auto& next_node : neighbors) {
dist_t next_dist = feat_vec_t::distance(
query_feat_ptr,
query_feat,
graph_l0.get_node_feat(next_node)
);
if (next_dist < curr_dist) {
if(next_dist < curr_dist) {
curr_dist = next_dist;
curr_node = next_node;
changed = true;
Expand All @@ -635,16 +642,16 @@ namespace ann {
}
}
if(lock_free) {
for (auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<true>(query_feat_ptr, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
for(auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<true>(query_feat, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
curr_node = mutually_connect<true>(query_id, top_candidates, level, &ws.mtx_nodes);
if (level == 0) { break; }
if(level == 0) { break; }
}
} else {
for (auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<false>(query_feat_ptr, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
for(auto level = std::min(query_level, max_level); ; level--) {
auto& top_candidates = search_level<false>(query_feat, curr_node, this->efC, level, searcher, &ws.mtx_nodes);
curr_node = mutually_connect<false>(query_id, top_candidates, level, &ws.mtx_nodes);
if (level == 0) { break; }
if(level == 0) { break; }
}
}

Expand Down Expand Up @@ -693,11 +700,46 @@ namespace ann {

bool lock_free = (threads == 1);
#pragma omp parallel for schedule(dynamic, 1)
for (index_type query_id = 0; query_id < num_node; query_id++) {
for(index_type node_id = 0; node_id < num_node; node_id++) {
int thread_id = omp_get_thread_num();
add_point(query_id, ws, thread_id, lock_free);
add_point(node_id, ws, thread_id, lock_free);
}

auto sort_neighbors_for_node = [&](index_type node_id, workspace_t& ws, int thread_id) {
auto& hnsw = ws.hnsw;
auto& graph_l0 = hnsw.graph_l0;
auto& graph_l1 = hnsw.graph_l1;
auto& queue = ws.searchers[thread_id].cand_queue;

const auto &src = graph_l0.get_node_feat(node_id);
for(index_type level = 0; level <= ws.node2level[node_id]; level++) {
GraphBase *G;
if(level == 0) {
G = &graph_l0;
} else {
G = &graph_l1;
}
auto neighbors = G->get_neighborhood(node_id, level);
if(neighbors.degree() == 0) {
return;
}
queue.clear();
for(index_type j = 0; j < neighbors.degree(); j++) {
const auto& dst = graph_l0.get_node_feat(neighbors[j]);
queue.emplace_back(feat_vec_t::distance(src, dst), neighbors[j]);
}
std::sort(queue.begin(), queue.end());
for(index_type j = 0; j < neighbors.degree(); j++) {
neighbors[j] = queue[j].node_id;
}
}
};

#pragma omp parallel for schedule(dynamic, 1)
for(index_type node_id = 0; node_id < num_node; node_id++) {
int thread_id = omp_get_thread_num();
sort_neighbors_for_node(node_id, ws, thread_id);
}
}

// Algorithm 2 of HNSW paper
Expand All @@ -723,7 +765,7 @@ namespace ann {
searcher.mark_visited(init_node);

const GraphBase *G;
if (level == 0) {
if(level == 0) {
G = &graph_l0;
} else {
G = &graph_l1;
Expand All @@ -732,7 +774,7 @@ namespace ann {
// Best First Search loop
while (!cand_queue.empty()) {
pair_t cand_pair = cand_queue.top();
if (cand_pair.dist > topk_ub_dist) {
if(cand_pair.dist > topk_ub_dist) {
break;
}
cand_queue.pop();
Expand All @@ -745,24 +787,24 @@ namespace ann {
// visiting neighbors of candidate node
const auto neighbors = G->get_neighborhood(cand_node, level);
graph_l0.prefetch_node_feat(neighbors[0]);
for (index_type j = 0; j < neighbors.degree(); j++) {
for(index_type j = 0; j < neighbors.degree(); j++) {
graph_l0.prefetch_node_feat(neighbors[j + 1]);
auto next_node = neighbors[j];
if (!searcher.is_visited(next_node)) {
if(!searcher.is_visited(next_node)) {
searcher.mark_visited(next_node);
dist_t next_lb_dist;
next_lb_dist = feat_vec_t::distance(
query,
graph_l0.get_node_feat(next_node)
);
if (topk_queue.size() < efS || next_lb_dist < topk_ub_dist) {
if(topk_queue.size() < efS || next_lb_dist < topk_ub_dist) {
cand_queue.emplace(next_lb_dist, next_node);
graph_l0.prefetch_node_feat(cand_queue.top().node_id);
topk_queue.emplace(next_lb_dist, next_node);
if (topk_queue.size() > efS) {
if(topk_queue.size() > efS) {
topk_queue.pop();
}
if (!topk_queue.empty()) {
if(!topk_queue.empty()) {
topk_ub_dist = topk_queue.top().dist;
}
}
Expand All @@ -786,20 +828,20 @@ namespace ann {
query,
G0.get_node_feat(init_node)
);
for (index_type curr_level = this->max_level; curr_level >= 1; curr_level--) {
for(index_type curr_level = this->max_level; curr_level >= 1; curr_level--) {
bool changed = true;
while (changed) {
changed = false;
const auto neighbors = G1.get_neighborhood(curr_node, curr_level);
graph_l0.prefetch_node_feat(neighbors[0]);
for (index_type j = 0; j < neighbors.degree(); j++) {
for(index_type j = 0; j < neighbors.degree(); j++) {
graph_l0.prefetch_node_feat(neighbors[j + 1]);
auto next_node = neighbors[j];
dist_t next_dist = feat_vec_t::distance(
query,
G0.get_node_feat(next_node)
);
if (next_dist < curr_dist) {
if(next_dist < curr_dist) {
curr_dist = next_dist;
curr_node = next_node;
changed = true;
Expand Down
8 changes: 5 additions & 3 deletions pecos/core/utils/tfidf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class Tokenizer {

Tokenizer(const string& load_dir) { load(load_dir); }

void load_config(const string& filepath) {
static nlohmann::json load_config(const string& filepath) {
std::ifstream loadfile(filepath);
string json_str;
if(loadfile.is_open()) {
Expand All @@ -333,7 +333,7 @@ class Tokenizer {
throw std::runtime_error("Unable to open config file at " + filepath);
}
auto j_param = nlohmann::json::parse(json_str);
tok_type = j_param["token_type"];
return j_param;
}

void save_config(const string& filepath) const {
Expand Down Expand Up @@ -370,7 +370,9 @@ class Tokenizer {
}

void load(const string& load_dir) {
load_config(load_dir + "/config.json");
auto config = load_config(load_dir + "/config.json");
this->tok_type = config["token_type"];

std::ifstream loadfile(load_dir + "/vocab.txt");
if(loadfile.is_open()) {
string line;
Expand Down
2 changes: 1 addition & 1 deletion test/pecos/apps/text2text/test_text2text.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def assert_json_string(str_a, str_b):
cmd += ["-q {}".format(item_file)]
cmd += ["-m {}".format(model_folder)]
cmd += ["--max-leaf-size {}".format(10)]
cmd += ['--vectorizer-config-json \{\\"type\\":\\"sklearntfidf\\",\\"kwargs\\":\{\}\}']
cmd += ["""--vectorizer-config-json '{"type":"sklearntfidf","kwargs":{}}'"""]
print(" ".join(cmd))
process = subprocess.run(
shlex.split(" ".join(cmd)), stdout=subprocess.PIPE, stderr=subprocess.PIPE
Expand Down
3 changes: 2 additions & 1 deletion test/tst-data/ann/hnsw-model-dense/c_model/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"hnsw_t": "pecos::ann::HNSW<float, pecos::ann::FeatVecDenseIPSimd<float>>",
"version": "v1.0",
"train_params": {
"efC": 100,
"init_node": 50,
Expand All @@ -8,4 +9,4 @@
"max_level": 1,
"num_node": 90
}
}
}
3 changes: 2 additions & 1 deletion test/tst-data/ann/hnsw-model-sparse/c_model/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"hnsw_t": "pecos::ann::HNSW<float, pecos::ann::FeatVecSparseIPSimd<uint32_t, float>>",
"version": "v1.0",
"train_params": {
"efC": 100,
"init_node": 50,
Expand All @@ -8,4 +9,4 @@
"max_level": 1,
"num_node": 90
}
}
}