-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #20 from baloise/main
PR for new image build
- Loading branch information
Showing
6 changed files
with
303 additions
and
49 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
#!/bin/sh | ||
export PATH="$HOME/.local/bin:$PATH" | ||
pipx install poetry | ||
pipx ensurepath | ||
. ~/.bashrc | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import spacy | ||
import spacy.cli | ||
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry | ||
#from presidio_anonymizer import AnonymizerEngine | ||
from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpEngineProvider | ||
from presidio_analyzer.predefined_recognizers import SpacyRecognizer | ||
from presidio_analyzer import PatternRecognizer | ||
|
||
class Anon_Spacy: | ||
def __init__(self): | ||
languages = ['en','de','fr','it'] | ||
size = "lg" | ||
gernres = {lang: "web" if lang == 'en' else "news" for lang in languages} | ||
self.models = {lang: f"{lang}_core_{gernres[lang]}_{size}" for lang in languages} | ||
self.models_loaded = [] | ||
|
||
def analyze_text(self, text, language='de',entities=['PERSON']): | ||
if not language in self.models: | ||
print(f"WARN: language '{language}' not supported. Supported languages are {self.models.keys()}.") | ||
return self.get_analyzer(language,entities).analyze(text=text, language=language, entities=["PERSON"]) | ||
|
||
def get_analyzer(self,language='de',entities=['PERSON']): | ||
self.ensure_model_loaded(self.models[language]) | ||
nlp_engine = SpacyNlpEngine(models=[{"lang_code": language, "model_name": self.models[language]}]) | ||
analyzer = AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=[language]) | ||
analyzer.registry.add_recognizer(SpacyRecognizer(supported_language=language, supported_entities=entities)) | ||
return analyzer | ||
|
||
def ensure_model_loaded(self,model_name): | ||
if model_name in self.models_loaded: | ||
print(f"Model '{model_name}' already loaded.") | ||
return | ||
print(f"Loading model '{model_name}'.") | ||
try: | ||
# Try to load the model | ||
return spacy.load(model_name) | ||
except OSError: | ||
# If the model is not found, download it | ||
print(f"Model '{model_name}' not found. Downloading...") | ||
spacy.cli.download(model_name) | ||
print(f"Model '{model_name}' downloaded successfully.") | ||
return spacy.load(model_name) | ||
finally: | ||
self.models_loaded.append(model_name) | ||
print(f"Model '{model_name}' loaded.") | ||
|
||
# Add custom recognizers if needed | ||
# Example: Adding a custom recognizer for French phone numbers | ||
# fr_phone_recognizer = PatternRecognizer(supported_entity="FR_PHONE_NUMBER", | ||
# patterns=[{"name": "FR_PHONE", | ||
# "regex": r"(\+33|0)[1-9]\d{8}", | ||
# "score": 0.9}]) | ||
# analyzer.registry.add_recognizer(fr_phone_recognizer) | ||
|
||
|
||
# Initialize the anonymizer engine | ||
#anonymizer = AnonymizerEngine() | ||
|
||
|
||
# def anonymize_text(text, language): | ||
# return anonymizer.anonymize(text=text, analyzer_results=analyze_text(text,language)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import regex as re | ||
import dill | ||
import ahocorasick | ||
from multiprocessing import Pool | ||
|
||
|
||
class Anon: | ||
def __init__(self, names): | ||
self.pattern = re.compile(r'\b(' + '|'.join(map(re.escape, names)) + r')\b') | ||
self.automaton = ahocorasick.Automaton() | ||
for index, name in enumerate(names): | ||
self.automaton.add_word(name, (index, name)) | ||
self.automaton.make_automaton() | ||
self.name_set = set(names) | ||
self._init_trie() | ||
|
||
def find_regex(self, text): | ||
return [(match.group(), match.start(), match.end()) for match in self.pattern.finditer(text)] | ||
|
||
def find_ahocorasick(self,text): | ||
# occurrences = [] | ||
# for end_index, (idx, name) in self.automaton.iter(text): | ||
# start_index = end_index - len(name) + 1 | ||
# occurrences.append((name, start_index, end_index)) | ||
# return occurrences | ||
occurrences = [] | ||
for end_index, (idx, name) in self.automaton.iter(text): | ||
start_index = end_index - len(name) + 1 | ||
# Check if the match is an entire word using word boundaries | ||
if (start_index == 0 or not text[start_index - 1].isalnum()) and \ | ||
(end_index == len(text) - 1 or not text[end_index + 1].isalnum()): | ||
occurrences.append((name, start_index, end_index)) | ||
return occurrences | ||
|
||
def find_trie(self,text): | ||
firstnames = list(re.finditer(self.first_trie_regex, text, overlapped=True)) | ||
lastnames = list(re.finditer(self.last_trie_regex, text, overlapped=True)) | ||
return [(match.group(), match.start(), match.end()) for match in firstnames + lastnames] | ||
|
||
def _init_trie(self): | ||
from src.utils.ano_regex import create_names_regex | ||
from src.utils.trie import Trie | ||
|
||
with open('./data/first_names_trie_regex.pkl', 'rb') as f: | ||
self.first_trie_regex = dill.load(f) | ||
with open('./data/last_names_trie_regex.pkl', 'rb') as f: | ||
self.last_trie_regex = dill.load(f) | ||
|
||
|
||
def find_set(self,text): | ||
occurrences = [] | ||
for match in re.finditer(r'\b\w+\b', text): | ||
word = match.group() | ||
if word in self.name_set: | ||
occurrences.append((word, match.start(), match.end())) | ||
return occurrences | ||
|
||
def run_parallel(self, method, text, num_workers=4): | ||
from multiprocessing import Pool | ||
|
||
# Split text into lines | ||
lines = text.splitlines(keepends=True) | ||
total_lines = len(lines) | ||
chunk_size = total_lines // num_workers | ||
|
||
# Create chunks ensuring each line is entirely in one block | ||
chunks = [] | ||
for i in range(num_workers): | ||
start_index = i * chunk_size | ||
end_index = (i + 1) * chunk_size if i != num_workers - 1 else total_lines | ||
chunk = ''.join(lines[start_index:end_index]) | ||
chunks.append(chunk) | ||
|
||
with Pool(num_workers) as pool: | ||
results = pool.map(method, chunks) | ||
return [item for sublist in results for item in sublist] | ||
|
||
if __name__ == "__main__": | ||
with open('data/_all_orig.txt', 'r') as file: | ||
text = file.read() | ||
|
||
with open('data/first_names.txt', 'r') as names_file: | ||
names = {line.strip() for line in names_file} | ||
|
||
with open('data/last_names.txt', 'r') as names_file: | ||
lnames = {line.strip() for line in names_file} | ||
|
||
names.update(lnames) | ||
if '' in names: | ||
names.remove('') | ||
|
||
anon = Anon(names) | ||
|
||
def write_matches(matches, file): | ||
with open(f"tmp/{file}.txt", 'w') as file: | ||
file.write('\n'.join(repr(match) for match in matches)) | ||
|
||
|
||
matches_trie = anon.find_trie(text) | ||
print(len(matches_trie)) | ||
write_matches(matches_trie, 'matches_trie') | ||
|
||
|
||
matches_set = anon.find_set(text) | ||
print(len(matches_set)) | ||
write_matches(matches_set, 'matches_set') | ||
|
||
|
||
matches_regex = anon.find_regex(text) | ||
print(len(matches_regex)) | ||
write_matches(matches_regex, 'matches_regex') | ||
|
||
|
||
matches_aho = anon.find_ahocorasick(text) | ||
print(len(matches_aho)) | ||
write_matches(matches_aho, 'matches_aho') | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import spacy | ||
import spacy.cli | ||
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry | ||
#from presidio_anonymizer import AnonymizerEngine | ||
from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpEngineProvider | ||
from presidio_analyzer.predefined_recognizers import SpacyRecognizer | ||
from presidio_analyzer import PatternRecognizer | ||
|
||
class Anon_Spacy: | ||
def __init__(self): | ||
languages = ['en','de','fr','it'] | ||
size = "lg" | ||
gernres = {lang: "web" if lang == 'en' else "news" for lang in languages} | ||
self.models = {lang: f"{lang}_core_{gernres[lang]}_{size}" for lang in languages} | ||
self.models_loaded = [] | ||
|
||
def analyze_text(self, text, language='de',entities=['PERSON']): | ||
if not language in self.models: | ||
print(f"WARN: language '{language}' not supported. Supported languages are {self.models.keys()}.") | ||
return self.get_analyzer(language,entities).analyze(text=text, language=language, entities=["PERSON"]) | ||
|
||
def get_analyzer(self,language='de',entities=['PERSON']): | ||
self.ensure_model_loaded(self.models[language]) | ||
nlp_engine = SpacyNlpEngine(models=[{"lang_code": language, "model_name": self.models[language]}]) | ||
analyzer = AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=[language]) | ||
analyzer.registry.add_recognizer(SpacyRecognizer(supported_language=language, supported_entities=entities)) | ||
return analyzer | ||
|
||
def ensure_model_loaded(self,model_name): | ||
if model_name in self.models_loaded: | ||
print(f"Model '{model_name}' already loaded.") | ||
return | ||
print(f"Loading model '{model_name}'.") | ||
try: | ||
# Try to load the model | ||
return spacy.load(model_name) | ||
except OSError: | ||
# If the model is not found, download it | ||
print(f"Model '{model_name}' not found. Downloading...") | ||
spacy.cli.download(model_name) | ||
print(f"Model '{model_name}' downloaded successfully.") | ||
return spacy.load(model_name) | ||
finally: | ||
self.models_loaded.append(model_name) | ||
print(f"Model '{model_name}' loaded.") | ||
|
||
# Add custom recognizers if needed | ||
# Example: Adding a custom recognizer for French phone numbers | ||
# fr_phone_recognizer = PatternRecognizer(supported_entity="FR_PHONE_NUMBER", | ||
# patterns=[{"name": "FR_PHONE", | ||
# "regex": r"(\+33|0)[1-9]\d{8}", | ||
# "score": 0.9}]) | ||
# analyzer.registry.add_recognizer(fr_phone_recognizer) | ||
|
||
|
||
# Initialize the anonymizer engine | ||
#anonymizer = AnonymizerEngine() | ||
|
||
|
||
# def anonymize_text(text, language): | ||
# return anonymizer.anonymize(text=text, analyzer_results=analyze_text(text,language)) |
Oops, something went wrong.