diff --git a/src/api/mask.py b/src/api/mask.py index 34ef652..503d9bd 100644 --- a/src/api/mask.py +++ b/src/api/mask.py @@ -2,7 +2,8 @@ from fastapi.responses import JSONResponse from fastapi import Request, routing from pydantic import BaseModel -from ..utils.llm import llm_find_entities +from src.utils.llm import llm_find_entities +from src.utils.replacer import replace_values router = fastapi.APIRouter() @@ -12,4 +13,5 @@ class MaskRequest(BaseModel): @router.post("/mask", response_class=JSONResponse, include_in_schema=True) async def mask(request: MaskRequest): entities = llm_find_entities(request.text) - return {"original_text": request.text, "llm_entities": entities} \ No newline at end of file + anontext = replace_values(request.text, entities) + return {"original_text": request.text, "llm_entities": entities, "anonymized_text": anontext} \ No newline at end of file diff --git a/src/static/scripts/form.js b/src/static/scripts/form.js index be17906..2b0d5bf 100644 --- a/src/static/scripts/form.js +++ b/src/static/scripts/form.js @@ -24,6 +24,7 @@ document.getElementById('inputForm').addEventListener('submit', function(event) // Display the response in the textarea document.getElementById('responseFieldText').value = JSON.stringify(text.original_text, null, 2); // Format the JSON response document.getElementById('responseFieldEntities').value = JSON.stringify(text.llm_entities, null, 2); // Format the JSON response + document.getElementById('responseFieldAnonText').value = text.anonymized_text; // Format the string response }) .catch((error) => { console.error('Error:', error); // Handle any errors diff --git a/src/templates/html/form.html b/src/templates/html/form.html index 0806476..8aa9368 100644 --- a/src/templates/html/form.html +++ b/src/templates/html/form.html @@ -12,7 +12,7 @@

YoYo MaskЯ

- +
@@ -20,7 +20,7 @@

YoYo MaskЯ

Input:

- +
@@ -35,6 +35,11 @@

Entities found:



+ +
+

anonymized text:

+

+
+ diff --git a/src/utils/regex.py b/src/utils/regex.py index afcd1ce..e5b3d46 100644 --- a/src/utils/regex.py +++ b/src/utils/regex.py @@ -7,12 +7,13 @@ COUNTRY_CODES_REGEX = r'41' CURRENCY_REGEX = r'EUR|CHF|Fr\.|Franken|Francs' -def anonymize_regex(text, regex, by='ENTITY', tag='#', count=True, first=1, compare_lowercase=False, keep_regex=False): +def anonymize_regex(text, regex, by='ENTITY', start_tag='#', end_tag='#', count=True, first=1, compare_lowercase=False, keep_regex=False): """ :param text: :param regex: :param by: - :param tag: + :param start_tag: + :param end_tag: :pram count: :param compare_lowercase: :param keep_regex: @@ -22,7 +23,7 @@ def anonymize_regex(text, regex, by='ENTITY', tag='#', count=True, first=1, comp replace_dict = {} counter = len(set(r.group() for r in matches)) + first - 1 for r in matches[::-1]: - replace_by = f'{tag}{by}_{counter}{tag}' if count else f'{tag}{by}{tag}' + replace_by = f'{start_tag}{by}_{counter}{end_tag}' if count else f'{start_tag}{by}{end_tag}' if r.group() not in replace_dict.keys(): replace_dict.update({r.group(): replace_by}) counter -=1 @@ -31,16 +32,18 @@ def anonymize_regex(text, regex, by='ENTITY', tag='#', count=True, first=1, comp return {'text': text, 'replace_dict': replace_dict} | ({'regex': regex, 'matches': matches} if keep_regex else {}) -def create_names_regex(names_list): +def create_names_regex(names_list, boundary=r'\b'): """ :param names_list: :return: """ - return re.compile('|'.join(sorted(list(set(names_list)), key=len, reverse=True))) + if isinstance(names_list, re.Pattern): + return names_list + return re.compile(boundary + '|'.join(sorted(list(set(n for n in names_list if len(n) > 0)), key=len, reverse=True)) + boundary) -def update_result(result, new_result, keep_regex=False): +def _update_result(result, new_result, keep_regex=False): """ :param result: @@ -54,28 +57,49 @@ def update_result(result, new_result, keep_regex=False): return result -def anonymize_ahv(text, by='AHVNR', tag='#', count=True, first=1, keep_regex=False): +def _flip_replace_dict(replace_dict): + """ + + :param replace_dict: + :return: + """ + flipped_dict = {} + for key, value in replace_dict.items(): + if value not in flipped_dict: + flipped_dict[value] = {'matches': {key}, 'replacement': key} + else: + flipped_dict[value]['matches'].add(key) + # Determine the replacement based on length and lexicographical order + current_replacement = flipped_dict[value]['replacement'] + if len(key) > len(current_replacement) or (len(key) == len(current_replacement) and key < current_replacement): + flipped_dict[value]['replacement'] = key + return flipped_dict + + +def anonymize_ahv(text, by='AHVNR', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False): """ :param text: :param by: - :param tag: + :param start_tag: + :param end_tag: :param count: :param first: :param keep_regex: :return: """ regex = re.compile(r'\b756\.\s?\d{4}\.\s?\d{4}\.\s?\d{2}\b') - return anonymize_regex(text, regex, by, tag, count, first, False, keep_regex) + return anonymize_regex(text, regex, by, start_tag, end_tag, count, first, False, keep_regex) -def anonymize_currency(text, by='CURRENCY', tag='#', count=True, first=1, currency_regex=CURRENCY_REGEX, +def anonymize_currency(text, by='CURRENCY', start_tag='#', end_tag='#', count=True, first=1, currency_regex=CURRENCY_REGEX, separator_regex=r"'|’| ", decimal_regex=r"\.", without_symbol=True, compare_lowercase=False, keep_regex=False): """ :param text: :param by: - :param tag: + :param start_tag: + :param end_tag: :param count: :param first: :param currency_regex: @@ -91,15 +115,19 @@ def anonymize_currency(text, by='CURRENCY', tag='#', count=True, first=1, curren regex = (r"\b(?:(?:" + currency_regex + r")(?: )*-?" + num + r")" # '(?: )*' was '(?:\s)* ' + r"|(?:-?" + num + "(?: )*(?:" + currency_regex + r")" # '(?: )*' was '(?:\s)* ' + (r"|-?(?:" + num + sep + r")+" + num if without_symbol else "") + r")(\b)?") - return anonymize_regex(text, re.compile(regex.lower() if compare_lowercase else regex), by, tag, count, first, + return anonymize_regex(text, re.compile(regex.lower() if compare_lowercase else regex), by, start_tag, end_tag, count, first, compare_lowercase, keep_regex) -def anonymize_dates(text, by='DATE', tag='#', count=True, first=1, lang='de', keep_regex=False): +def anonymize_dates(text, by='DATE', start_tag='#', end_tag='#', count=True, first=1, lang='de', keep_regex=False): """ :param text: :param date_dict: :param by: + :param start_tag: + :param end_tag: + :param count: + :param first: :param lang: :param compare_lowercase: :return: @@ -149,27 +177,28 @@ def anonymize_dates(text, by='DATE', tag='#', count=True, first=1, lang='de', ke match = regex.search(text, match.span()[1]) for k, v in matches.items(): text = re.sub(k, v, text) - result = anonymize_regex(text, r'\d{4}-\d{2}-\d{2}', by, tag, count, first, False, keep_regex) + result = anonymize_regex(text, r'\d{4}-\d{2}-\d{2}', by, start_tag, end_tag, count, first, False, keep_regex) result.update({'replace_dict': {k: result.get('replace_dict').get(v) for k, v in matches.items()}}) return result -def anonymize_email(text, by='EMAIL', tag='#', count=True, first=1, keep_regex=False): +def anonymize_email(text, by='EMAIL', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False): """ :param text: :param by: - :param tag: + :param start_tag: + :param end_tag: :param count: :param first: :param keep_regex: :return: """ regex = re.compile(r'\b[\w\.-]+@[\w\.-]+\b') - return anonymize_regex(text, regex, by, tag, count, first, False, keep_regex) + return anonymize_regex(text, regex, by, start_tag, end_tag, count, first, False, keep_regex) -def anonymize_names(text, first_names_regex, last_names_regex, sep=r'[\s,\n]', by='NAME', tag='#', +def anonymize_names(text, first_names_regex, last_names_regex, sep=r'[\s,\n]', by='NAME', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False): """ @@ -178,7 +207,8 @@ def anonymize_names(text, first_names_regex, last_names_regex, sep=r'[\s,\n]', b :param last_names_regex: :param sep: :param by: - :param tag: + :param start_tag: + :param end_tag: :param count: :param first: :param keep_regex: @@ -196,16 +226,17 @@ def _find_names(text, names_regex): rgx = rgx + [re.compile(r'\b(?:' + n + r')\b') for n in sorted(list(set(first_names + last_names)), key=len, reverse=True)] result = {'text': text, 'replace_dict': {}} | ({'regex': [], 'matches': []} if keep_regex else {}) for r in rgx: - result = update_result(result, anonymize_regex(result.get('text'), r, by=by, tag=tag, count=count, + result = _update_result(result, anonymize_regex(result.get('text'), r, by=by, start_tag=start_tag, end_tag=end_tag, count=count, first=first + len(result.get('replace_dict')), keep_regex=keep_regex)) return result -def anonymize_percentage(text, by='PERC', tag='#', count=True, first=1, keep_regex=False, decimal_regex=r'\.'): +def anonymize_percentage(text, by='PERC', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False, decimal_regex=r'\.'): """ :param text: :param by: - :param tag: + :param start_tag: + :param end_tag: :param count: :param first: :param keep_regex @@ -213,14 +244,15 @@ def anonymize_percentage(text, by='PERC', tag='#', count=True, first=1, keep_reg :return: """ regex = r'(?:-?\d+(?:' + decimal_regex + r'\d+)? ?%)' - return anonymize_regex(text, re.compile(regex), by, tag, count, first, False, keep_regex) + return anonymize_regex(text, re.compile(regex), by, start_tag, end_tag, count, first, False, keep_regex) -def anonymize_phone(text, by='PHONE', tag='#', count=True, first=1, keep_regex=False, country_codes=COUNTRY_CODES_REGEX): +def anonymize_phone(text, by='PHONE', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False, country_codes=COUNTRY_CODES_REGEX): """ :param text: :param by: - :param tag: + :param start_tag: + :param end_tag: :param count: :param first: :param keep_regex: @@ -229,11 +261,11 @@ def anonymize_phone(text, by='PHONE', tag='#', count=True, first=1, keep_regex=F """ regex = re.compile(r'(\b(00(' + country_codes + r')|0)|\B(\+(' + country_codes + r')))(\s?\(0\))?(\s)?[1-9]{2}([\s\/\-])?[0-9]{3}([\s\/\-])?[0-9]{2}([\s\/\-])?[0-9]{2}\b') - return anonymize_regex(text, regex, by, tag, count, first, False, keep_regex) + return anonymize_regex(text, regex, by, start_tag, end_tag, count, first, False, keep_regex) def anonymize_entities(text, by_ahv='AHV', by_phone='PHONE', by_currencies='CURRENCY', by_email='EMAIL', by_dates='DATE', by_names='NAME', - first_names=None, last_names=None, tag='#', count=True, first=1, keep_regex=False): + first_names=None, last_names=None, start_tag='#', end_tag='#', count=True, first=1, flip_replace_dict=True, keep_regex=False): """ :param text: @@ -243,30 +275,40 @@ def anonymize_entities(text, by_ahv='AHV', by_phone='PHONE', by_currencies='CURR :param by_email: :param by_dates: :param by_names: - :param first_names: - :param last_names: - :param tag: + :param first_names: list or precompiled regex + :param last_names: list or precompiled regex + :param start_tag: + :param end_tag: :param count: :param first: + :param flip_replace_dict: :param keep_regex: :return: """ result = {'text': text, 'replace_dict': {}} | ({'regex': [], 'matches': []} if keep_regex else {}) if by_ahv is not None: - update_result(result, anonymize_ahv(result.get('text'), by=by_ahv, tag=tag, count=count, first=first, keep_regex=keep_regex)) + _update_result(result, anonymize_ahv(result.get('text'), by=by_ahv, start_tag=start_tag, end_tag=end_tag, + count=count, first=first, keep_regex=keep_regex)) if by_phone is not None: - update_result(result, anonymize_phone(result.get('text'), by=by_phone, tag=tag, count=count, first=first, keep_regex=keep_regex)) + _update_result(result, anonymize_phone(result.get('text'), by=by_phone, start_tag=start_tag, end_tag=end_tag, + count=count, first=first, keep_regex=keep_regex)) if by_currencies is not None: - update_result(result, anonymize_currency(result.get('text'), by=by_currencies, tag=tag, count=count, - first=first, keep_regex=keep_regex)) + _update_result(result, anonymize_currency(result.get('text'), by=by_currencies, start_tag=start_tag, end_tag=end_tag, + count=count, first=first, keep_regex=keep_regex)) if by_email is not None: - update_result(result, anonymize_email(result.get('text'), by=by_email, tag=tag, count=count, first=first, keep_regex=keep_regex)) + _update_result(result, anonymize_email(result.get('text'), by=by_email, start_tag=start_tag, end_tag=end_tag, + count=count, first=first, keep_regex=keep_regex)) if by_dates is not None: - update_result(result, anonymize_dates(result.get('text'), by=by_dates, tag=tag, count=count, first=first, keep_regex=keep_regex)) + _update_result(result, anonymize_dates(result.get('text'), by=by_dates, start_tag=start_tag, end_tag=end_tag, + count=count, first=first, keep_regex=keep_regex)) if (by_names is not None) & (first_names is not None) & (last_names is not None): - first_names_regex, last_names_regex = create_names_regex(first_names), create_names_regex(last_names) - update_result(result, anonymize_names(result.get('text'), first_names_regex=first_names_regex, last_names_regex=last_names_regex, - by=by_names, tag=tag, count=count, first=first, keep_regex=keep_regex)) + first_names_regex = create_names_regex(first_names) + last_names_regex = create_names_regex(last_names) + _update_result(result, anonymize_names(result.get('text'), first_names_regex=first_names_regex, last_names_regex=last_names_regex, + by=by_names, start_tag=start_tag, end_tag=end_tag, count=count, first=first, + keep_regex=keep_regex)) + if flip_replace_dict: + result['replace_dict'] = _flip_replace_dict(result.get('replace_dict')) return result @@ -299,3 +341,4 @@ def demo(): text = 'Der 4. Januar 2025, der 4.1.25 und der 5.7.2023 sind Datumswerte.' for k, v in ({'original': text} | anonymize_dates(text)).items(): print(f'{k}: {v}') + \ No newline at end of file diff --git a/src/utils/replacer.py b/src/utils/replacer.py new file mode 100644 index 0000000..a7c4330 --- /dev/null +++ b/src/utils/replacer.py @@ -0,0 +1,9 @@ +import json + +def replace_values(text, replacements) -> str: + # Iterate through the keys in the replacements dictionary + for key, values in replacements.items(): + # Replace occurrences of each value with the corresponding key + for value in values: + text = text.replace(value, key) + return text \ No newline at end of file diff --git a/src/utils/trie.py b/src/utils/trie.py new file mode 100644 index 0000000..c8888aa --- /dev/null +++ b/src/utils/trie.py @@ -0,0 +1,63 @@ +import regex as re + + +class Trie(): + """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern. + The corresponding Regex should match much faster than a simple Regex union.""" + + def __init__(self): + self.data = {} + + def add(self, word): + ref = self.data + for char in word: + ref[char] = char in ref and ref[char] or {} + ref = ref[char] + ref[''] = 1 + + def dump(self): + return self.data + + def quote(self, char): + return re.escape(char) + + def _pattern(self, pData): + data = pData + if "" in data and len(data.keys()) == 1: + return None + + alt = [] + cc = [] + q = 0 + for char in sorted(data.keys()): + if isinstance(data[char], dict): + try: + recurse = self._pattern(data[char]) + alt.append(self.quote(char) + recurse) + except: + cc.append(self.quote(char)) + else: + q = 1 + cconly = not len(alt) > 0 + + if len(cc) > 0: + if len(cc) == 1: + alt.append(cc[0]) + else: + alt.append('[' + ''.join(cc) + ']') + + if len(alt) == 1: + result = alt[0] + else: + result = "(?:" + "|".join(alt) + ")" + + if q: + if cconly: + result += "?" + else: + result = "(?:%s)?" % result + return result + + def pattern(self): + return self._pattern(self.dump()) + \ No newline at end of file diff --git a/yoyomaskr.ipynb b/yoyomaskr.ipynb index 6a316f8..612de4c 100644 --- a/yoyomaskr.ipynb +++ b/yoyomaskr.ipynb @@ -10,7 +10,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -73,7 +73,7 @@ "last_trie_regex = re.compile(r'\\b' + last_trie.pattern() + r'\\b')\n", "first_trie_regex = re.compile(r'\\b' + first_trie.pattern() + r'\\b')\n", "\n", - "result = [anonymize_entities(text, by_names='NAME', first_names=first_names, last_names=last_names) for text in tqdm(sample_texts[:])]\n", + "result = [anonymize_entities(text, by_names='NAME', first_names=first_trie_regex, last_names=last_trie_regex) for text in tqdm(sample_texts[:])]\n", "for i in range(len(result[:30])):\n", " print(result[i])" ]