Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch processing in training of NN ensemble - base project suggest calls #676

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions annif/backend/nn_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os.path
import shutil
from collections import defaultdict
from io import BytesIO

import joblib
Expand Down Expand Up @@ -210,18 +211,22 @@ def _corpus_to_vectors(self, corpus, seq, n_jobs):

self.info("Processing training documents...")
with pool_class(jobs) as pool:
for hits, subject_set in pool.imap_unordered(
psmap.suggest, corpus.documents
for hit_sets, subject_sets in pool.imap_unordered(
psmap.suggest_batch, corpus.doc_batches
):
doc_scores = []
for project_id, p_hits in hits.items():
vector = p_hits.as_vector(len(self.project.subjects))
doc_scores.append(
np.sqrt(vector) * sources[project_id] * len(sources)
)
score_vector = np.array(doc_scores, dtype=np.float32).transpose()
true_vector = subject_set.as_vector(len(self.project.subjects))
seq.add_sample(score_vector, true_vector)
score_vectors = defaultdict(list)
for project_id, p_hit_sets in hit_sets.items():
for doc_ind, p_hits in enumerate(p_hit_sets):
vector = p_hits.as_vector(len(self.project.subjects))
scaled_vector = (
np.sqrt(vector) * sources[project_id] * len(sources)
)
score_vectors[doc_ind].append(scaled_vector)
true_vectors = [
ss.as_vector(len(self.project.subjects)) for ss in subject_sets
]
for sv, tv in zip(score_vectors.values(), true_vectors):
seq.add_sample(np.array(sv, dtype=np.float32).transpose(), tv)

def _open_lmdb(self, cached, lmdb_map_size):
lmdb_path = os.path.join(self.datadir, self.LMDB_FILE)
Expand Down
10 changes: 0 additions & 10 deletions annif/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ def __init__(self, registry, project_ids, backend_params, limit, threshold):
self.limit = limit
self.threshold = threshold

def suggest(self, doc):
filtered_hits = {}
for project_id in self.project_ids:
project = self.registry.get_project(project_id)
hits = project.suggest([doc.text], self.backend_params)[0]
filtered_hits[project_id] = hits.filter(
project.subjects, self.limit, self.threshold
)
return (filtered_hits, doc.subject_set)

def suggest_batch(self, batch):
filtered_hit_sets = defaultdict(list)
texts, subject_sets = zip(*[(doc.text, doc.subject_set) for doc in batch])
Expand Down
24 changes: 24 additions & 0 deletions tests/test_backend_nn_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ def test_nn_ensemble_train_cached(registry):
assert datadir.join("nn-model.h5").size() > 0


def test_nn_ensemble_train_two_sources(registry, tmpdir):
project = registry.get_project("dummy-en")
nn_ensemble_type = annif.backend.get_backend("nn_ensemble")
nn_ensemble = nn_ensemble_type(
backend_id="nn_ensemble",
config_params={"sources": "dummy-en,dummy-fi", "epochs": 1},
project=project,
)

tmpfile = tmpdir.join("document.tsv")
tmpfile.write(
"dummy\thttp://example.org/dummy\n"
+ "another\thttp://example.org/dummy\n"
+ "none\thttp://example.org/none\n" * 40
)
document_corpus = annif.corpus.DocumentFile(str(tmpfile), project.subjects)

nn_ensemble.train(document_corpus)

datadir = py.path.local(project.datadir)
assert datadir.join("nn-model.h5").exists()
assert datadir.join("nn-model.h5").size() > 0


def test_nn_ensemble_train_and_learn_params(registry, tmpdir, capfd):
project = registry.get_project("dummy-en")
nn_ensemble_type = annif.backend.get_backend("nn_ensemble")
Expand Down