Skip to content

Commit

Permalink
rebase master onto production
Browse files Browse the repository at this point in the history
  • Loading branch information
baixiac committed Nov 4, 2021
2 parents ed2ec93 + 7df3422 commit 5b9b7ec
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 118 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ that data is not publicaly available.)
If you have access to UMLS or SNOMED-CT and can provide some proof (a screenshot of the [UMLS profile page](https://uts.nlm.nih.gov//uts.html#profile) is perfect, feel free to redact all information you do not want to share), contact us - we are happy to share the pre-built CDB and Vocab for those databases.


## Acknowledgement
## Acknowledgements
Entity extraction was trained on [MedMentions](https://github.com/chanzuckerberg/MedMentions) In total it has ~ 35K entites from UMLS

The vocabulary was compiled from [Wiktionary](https://en.wiktionary.org/wiki/Wiktionary:Main_Page) In total ~ 800K unique words
Expand Down
162 changes: 92 additions & 70 deletions medcat/cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import logging
import math
import types
from copy import deepcopy
from multiprocessing import Process, Manager, Queue, cpu_count
import time
import psutil
from time import sleep
from copy import deepcopy
from multiprocess import Process, Manager, cpu_count
from typing import Union, List, Tuple, Optional, Dict, Iterable, Generator
from tqdm.autonotebook import tqdm
from spacy.tokens import Span, Doc
Expand Down Expand Up @@ -93,11 +95,10 @@ def __init__(self, cdb, config, vocab, meta_cats=[]):
self.linker = Linker(self.cdb, vocab, self.config)
self.pipe.add_linker(self.linker)

self._meta_cats = meta_cats
# Add meta_annotaiton classes if they exist
self._meta_annotations = False
for meta_cat in meta_cats:
self.pipe.add_meta_cat(meta_cat, meta_cat.config.general['category_name'])
self._meta_annotations = True

# Set max document length
self.pipe.nlp.max_length = self.config.preprocessing.get('max_document_length')
Expand All @@ -121,15 +122,12 @@ def create_model_pack(self, save_dir_path, model_pack_name='medcat_model_pack'):
os.makedirs(save_dir_path, exist_ok=True)

# Save the used spacy model
spacy_path = os.path.join(save_dir_path, 'spacy_model')
spacy_path = os.path.join(save_dir_path, os.path.basename(self.config.general['spacy_model']))
if str(self.pipe.nlp._path) != spacy_path:
# First remove if something is there
shutil.rmtree(spacy_path, ignore_errors=True)
shutil.copytree(self.pipe.nlp._path, spacy_path)

# Change the name of the spacy model in the config
self.config.general['spacy_model'] = 'spacy_model'

# Save the CDB
cdb_path = os.path.join(save_dir_path, "cdb.dat")
self.cdb.save(cdb_path)
Expand Down Expand Up @@ -839,7 +837,7 @@ def _batch_generator(self, data, batch_size_chars, skip_ids=set()):
def _save_docs_to_file(self, docs, annotated_ids, save_dir_path, annotated_ids_path, part_counter=0):
path = os.path.join(save_dir_path, 'part_{}.pickle'.format(part_counter))
pickle.dump(docs, open(path, "wb"))
self.log.info("Saved part: %s, to: %s", (part_counter, path))
self.log.info("Saved part: %s, to: %s", part_counter, path)
part_counter = part_counter + 1 # Increase for save, as it should be what is the next part
pickle.dump((annotated_ids, part_counter), open(annotated_ids_path, 'wb'))
return part_counter
Expand All @@ -851,9 +849,10 @@ def multiprocessing(self,
only_cui: bool = False,
addl_info: List[str] = [],
separate_nn_components: bool = True,
out_split_size: int = None,
save_dir_path: str = None,) -> Dict:
r''' Run multiprocessing for inference, if out_save_path and out_split_size is used this will also continue annotating
out_split_size_chars: int = None,
save_dir_path: str = None,
min_free_memory=0.1) -> Dict:
r''' Run multiprocessing for inference, if out_save_path and out_split_size_chars is used this will also continue annotating
documents if something is saved in that directory.
Args:
Expand All @@ -868,18 +867,26 @@ def multiprocessing(self,
they will be run sequentially. This is useful as the NN components
have batching and like to process many docs at once, while the rest of the pipeline
runs the documents one by one.
out_split_size (`int`, None):
If set once more than out_split_size documents are annotated
they will be saved to a file (save_dir_path) and the memory cleared.
out_split_size_chars (`int`, None):
If set once more than out_split_size_chars are annotated
they will be saved to a file (save_dir_path) and the memory cleared. Recommended
value is 20*batch_size_chars.
save_dir_path(`str`, None):
Where to save the annotated documents if splitting.
min_free_memory(`float`, defaults to 0.1):
If set a process will not start unless there is at least this much RAM memory left,
should be a range between [0, 1] meaning how much of the memory has to be free. Helps when annotating
very large datasets because spacy is not the best with memory management and multiprocessing.
Returns:
A dictionary: {id: doc_json, id2: doc_json2, ...}, in case out_split_size is used
A dictionary: {id: doc_json, id2: doc_json2, ...}, in case out_split_size_chars is used
the last batch will be returned while that and all previous batches will be
written to disk (out_save_dir).
'''
if self._meta_annotations and not separate_nn_components:
# Set max document length
self.pipe.nlp.max_length = self.config.preprocessing.get('max_document_length')

if self._meta_cats and not separate_nn_components:
# Hack for torch using multithreading, which is not good if not
#separate_nn_components, need for CPU runs only
import torch
Expand All @@ -902,19 +909,26 @@ def multiprocessing(self,
part_counter = 0

docs = {}
_start_time = time.time()
_batch_counter = 0 # Used for splitting the output, counts batches inbetween saves
for batch in self._batch_generator(data, batch_size_chars, skip_ids=set(annotated_ids)):
self.log.info("Annotated until now: %s docs; Current BS: %s docs", (len(annotated_ids), len(batch)))
self.log.info("Annotated until now: %s docs; Current BS: %s docs; Elapsed time: %.2f minutes",
len(annotated_ids),
len(batch),
(time.time() - _start_time)/60)
try:
_docs = self._multiprocessing_batch(data=batch,
nproc=nproc,
batch_size_chars=internal_batch_size_chars,
only_cui=only_cui,
batch_size_chars=internal_batch_size_chars,
addl_info=addl_info,
nn_components=nn_components)
nn_components=nn_components,
min_free_memory=min_free_memory)
docs.update(_docs)
annotated_ids.extend(_docs.keys())
_batch_counter += 1
del _docs
if out_split_size is not None and len(docs) > out_split_size:
if out_split_size_chars is not None and (_batch_counter * batch_size_chars) > out_split_size_chars:
# Save to file and reset the docs
part_counter = self._save_docs_to_file(docs=docs,
annotated_ids=annotated_ids,
Expand All @@ -923,12 +937,13 @@ def multiprocessing(self,
part_counter=part_counter)
del docs
docs = {}
_batch_counter = 0
except Exception as e:
self.log.warning("Failed an outer batch in the multiprocessing script")
self.log.warning(e, exc_info=True, stack_info=True)

# Save the last batch
if out_split_size is not None and len(docs) > 0:
if out_split_size_chars is not None and len(docs) > 0:
# Save to file and reset the docs
self._save_docs_to_file(docs=docs,
annotated_ids=annotated_ids,
Expand All @@ -950,7 +965,8 @@ def _multiprocessing_batch(self,
batch_size_chars: int = 1000000,
only_cui: bool = False,
addl_info: List[str] = [],
nn_components=[]) -> Dict:
nn_components=[],
min_free_memory=0) -> Dict:
r''' Run multiprocessing on one batch
Args:
Expand All @@ -965,47 +981,44 @@ def _multiprocessing_batch(self,
A dictionary: {id: doc_json, id2: doc_json2, ...}
'''
# Create the input output for MP
in_q = Queue(maxsize=4*nproc)
manager = Manager()
out_dict = manager.dict()
out_dict['processed'] = []

# Create processes
procs = []
for i in range(nproc):
p = Process(target=self._mp_cons,
kwargs={'in_q': in_q,
'out_dict': out_dict,
'pid': i,
'only_cui': only_cui,
'addl_info': addl_info})
p.start()
procs.append(p)

id2text = {}
for batch in self._batch_generator(data, batch_size_chars):
if nn_components:
# We need this for the json_to_fake_spacy
id2text.update({k:v for k,v in batch})
in_q.put(batch)

# Final data point for workers
for _ in range(nproc):
in_q.put(None)
# Join processes
for p in procs:
p.join()

docs = {}
for key in out_dict.keys():
if 'pid' in key:
# Covnerts a touple into a dict
docs.update({k:v for k,v in out_dict[key]})

# Cleanup - to prevent memory leaks, maybe
out_dict.clear()
del out_dict
in_q.close()
with Manager() as manager:
out_list = manager.list()
lock = manager.Lock()
in_q = manager.Queue(maxsize=10*nproc)

id2text = {}
for batch in self._batch_generator(data, batch_size_chars):
if nn_components:
# We need this for the json_to_fake_spacy
id2text.update({k:v for k,v in batch})
in_q.put(batch)

# Final data point for workers
for _ in range(nproc):
in_q.put(None)
sleep(2)

# Create processes
procs = []
for i in range(nproc):
p = Process(target=self._mp_cons,
kwargs={'in_q': in_q,
'out_list': out_list,
'pid': i,
'only_cui': only_cui,
'addl_info': addl_info,
'min_free_memory': min_free_memory,
'lock': lock})
p.start()
procs.append(p)

# Join processes
for p in procs:
p.join()

docs = {}
# Covnerts a touple into a dict
docs.update({k:v for k,v in out_list})

# If we have separate GPU components now we pipe that
if nn_components:
Expand Down Expand Up @@ -1038,7 +1051,7 @@ def multiprocessing_pipe(self,
if nproc == 0:
raise ValueError("nproc cannot be set to zero")

if self._meta_annotations:
if self._meta_cats:
# Hack for torch using multithreading, which is not good here
import torch
torch.set_num_threads(1)
Expand All @@ -1061,13 +1074,21 @@ def multiprocessing_pipe(self,

return out

def _mp_cons(self, in_q, out_dict, pid=0, only_cui=False, addl_info=[]):
def _mp_cons(self, in_q, out_list, min_free_memory, lock, pid=0, only_cui=False, addl_info=[]):
out = []

while True:
if not in_q.empty():
if psutil.virtual_memory().available / psutil.virtual_memory().total < min_free_memory:
with lock:
out_list.extend(out)
# Stop a process if there is not enough memory left
break

data = in_q.get()
if data is None:
out_dict['pid: {}'.format(pid)] = out
with lock:
out_list.extend(out)
break

for i_text, text in data:
Expand All @@ -1076,10 +1097,11 @@ def _mp_cons(self, in_q, out_dict, pid=0, only_cui=False, addl_info=[]):
doc = self.get_entities(text=text, only_cui=only_cui, addl_info=addl_info)
out.append((i_text, doc))
except Exception as e:
self.log.warning("Exception in _mp_cons")
self.log.warning(e, exc_info=True, stack_info=True)
self.log.warning("PID: %s failed one document in _mp_cons, running will continue normally. \n" +
"Document length in chars: %s, and ID: %s", pid, len(str(text)), i_text)
self.log.warning(str(e))
sleep(2)

sleep(1)

def _doc_to_out(self,
doc: Doc,
Expand Down
8 changes: 4 additions & 4 deletions medcat/cdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ def update_context_vector(self, cui, vectors, negative=False, lr=None, cui_count

# DEBUG
self.log.debug("Updated vector embedding.\n" +
"CUI: %s, Context Type: %s, Similarity: %.2f, Is Negative: %s, LR: %.5f, b: %.3f", (cui, context_type,
similarity, negative, lr, b))
"CUI: %s, Context Type: %s, Similarity: %.2f, Is Negative: %s, LR: %.5f, b: %.3f", cui, context_type,
similarity, negative, lr, b)
cv = self.cui2context_vectors[cui][context_type]
similarity_after = np.dot(unitvec(cv), unitvec(vector))
self.log.debug("Similarity before vs after: %.5f vs %.5f", (similarity, similarity_after))
self.log.debug("Similarity before vs after: %.5f vs %.5f", similarity, similarity_after)
else:
if negative:
self.cui2context_vectors[cui][context_type] = -1 * vector
Expand All @@ -344,7 +344,7 @@ def update_context_vector(self, cui, vectors, negative=False, lr=None, cui_count

# DEBUG
self.log.debug("Added new context type with vectors.\n" +
"CUI: %s, Context Type: %s, Is Negative: %s", (cui, context_type, negative))
"CUI: %s, Context Type: %s, Is Negative: %s", cui, context_type, negative)

if not negative:
# Increase counter only for positive examples
Expand Down
2 changes: 1 addition & 1 deletion medcat/cdb_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def prepare_csvs(self, csv_paths, sep=',', encoding=None, escapechar=None, index
description=description, full_build=full_build)
# DEBUG
self.log.debug("\n\n**** Added\n CUI: %s\n Names: %s\n Ontologies: %s\n Name status: %s\n Type IDs: %s\n Description: %s\n Is full build: %s",
(cui, names, ontologies, name_status, type_ids, description, full_build))
cui, names, ontologies, name_status, type_ids, description, full_build)

return self.cdb

Expand Down
6 changes: 3 additions & 3 deletions medcat/config_meta_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ def __init__(self):
'cntx_left': 15, # Number of tokens to take from the left of the concept
'cntx_right': 10, # Number of tokens to take from the right of the concept
'replace_center': None, # If set the center (concept) will be replaced with this string
'batch_size_eval': 5000,
'batch_size_eval': 5000, # Number of annotations to be meta-annotated at once in eval
'annotate_overlapping': False, # If set meta_anns will be calcualted for doc._.ents, otherwise for doc.ents
'tokenizer_name': 'bbpe',
'tokenizer_name': 'bbpe', # Tokenizer name used with of MetaCAT
# This is a dangerous option, if not sure ALWAYS set to False. If set, it will try to share the pre-calculated
#context tokens between MetaCAT models when serving. It will ignore differences in tokenizer and context size,
#so you need to be sure that the models for which this is turned on have the same tokenizer and context size, during
#a deployment.
'save_and_reuse_tokens': False,
'pipe_batch_size_in_chars': 20000000,
'pipe_batch_size_in_chars': 20000000, # How many characters are piped at once into the meta_cat class
}
self.model = {
'model_name': 'lstm',
Expand Down
Loading

0 comments on commit 5b9b7ec

Please sign in to comment.