Skip to content

Commit

Permalink
Merge pull request #18 from baloise/main
Browse files Browse the repository at this point in the history
Add replacer and new pickle files
  • Loading branch information
robbizbal authored Oct 30, 2024
2 parents a423286 + f270fb6 commit 3ca0f2a
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 45 deletions.
6 changes: 4 additions & 2 deletions src/api/mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from fastapi.responses import JSONResponse
from fastapi import Request, routing
from pydantic import BaseModel
from ..utils.llm import llm_find_entities
from src.utils.llm import llm_find_entities
from src.utils.replacer import replace_values

router = fastapi.APIRouter()

Expand All @@ -12,4 +13,5 @@ class MaskRequest(BaseModel):
@router.post("/mask", response_class=JSONResponse, include_in_schema=True)
async def mask(request: MaskRequest):
entities = llm_find_entities(request.text)
return {"original_text": request.text, "llm_entities": entities}
anontext = replace_values(request.text, entities)
return {"original_text": request.text, "llm_entities": entities, "anonymized_text": anontext}
1 change: 1 addition & 0 deletions src/static/scripts/form.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ document.getElementById('inputForm').addEventListener('submit', function(event)
// Display the response in the textarea
document.getElementById('responseFieldText').value = JSON.stringify(text.original_text, null, 2); // Format the JSON response
document.getElementById('responseFieldEntities').value = JSON.stringify(text.llm_entities, null, 2); // Format the JSON response
document.getElementById('responseFieldAnonText').value = text.anonymized_text; // Format the string response
})
.catch((error) => {
console.error('Error:', error); // Handle any errors
Expand Down
10 changes: 8 additions & 2 deletions src/templates/html/form.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
<header>
<div>
<h1>YoYo MaskЯ</h1>
<img class="logo" id="AppLogo" src="/static/public/yo-yo-maskr-logo.svg" alt="YoYo MaskR Logo" width="200" height="200">
<img class="logo" id="AppLogo" src="/static/public/yo-yo-maskr-logo.svg" alt="YoYo MaskR Logo">
</div>
</header>

<main>
<div class="center-block">
<h3>Input:</h3>
<form id="inputForm">
<input type="text" id="inputData" placeholder="Enter text to anonymize" required>
<input rows="10" cols="50" type="text" id="inputData" placeholder="Enter text to anonymize" required>
<input type="submit" value="Submit">
</form>
</div>
Expand All @@ -35,6 +35,11 @@ <h3>Entities found:</h3>
<textarea id="responseFieldEntities" rows="10" cols="50" readonly></textarea><br><br>
<button id="downloadBtn">Download</button>
</div>

<div class="center-block">
<h3>anonymized text:</h3>
<textarea id="responseFieldAnonText" rows="10" cols="50" readonly></textarea><br><br>
</div>
</main>

<footer>
Expand All @@ -44,6 +49,7 @@ <h3>Entities found:</h3>
</a>
</div>
</footer>

<!-- Link to the external JavaScript file -->
<script src="/static/scripts/form.js"></script>
<script src="/static/scripts/downloader.js"></script>
Expand Down
121 changes: 82 additions & 39 deletions src/utils/regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
COUNTRY_CODES_REGEX = r'41'
CURRENCY_REGEX = r'EUR|CHF|Fr\.|Franken|Francs'

def anonymize_regex(text, regex, by='ENTITY', tag='#', count=True, first=1, compare_lowercase=False, keep_regex=False):
def anonymize_regex(text, regex, by='ENTITY', start_tag='#', end_tag='#', count=True, first=1, compare_lowercase=False, keep_regex=False):
"""
:param text:
:param regex:
:param by:
:param tag:
:param start_tag:
:param end_tag:
:pram count:
:param compare_lowercase:
:param keep_regex:
Expand All @@ -22,7 +23,7 @@ def anonymize_regex(text, regex, by='ENTITY', tag='#', count=True, first=1, comp
replace_dict = {}
counter = len(set(r.group() for r in matches)) + first - 1
for r in matches[::-1]:
replace_by = f'{tag}{by}_{counter}{tag}' if count else f'{tag}{by}{tag}'
replace_by = f'{start_tag}{by}_{counter}{end_tag}' if count else f'{start_tag}{by}{end_tag}'
if r.group() not in replace_dict.keys():
replace_dict.update({r.group(): replace_by})
counter -=1
Expand All @@ -31,16 +32,18 @@ def anonymize_regex(text, regex, by='ENTITY', tag='#', count=True, first=1, comp
return {'text': text, 'replace_dict': replace_dict} | ({'regex': regex, 'matches': matches} if keep_regex else {})


def create_names_regex(names_list):
def create_names_regex(names_list, boundary=r'\b'):
"""
:param names_list:
:return:
"""
return re.compile('|'.join(sorted(list(set(names_list)), key=len, reverse=True)))
if isinstance(names_list, re.Pattern):
return names_list
return re.compile(boundary + '|'.join(sorted(list(set(n for n in names_list if len(n) > 0)), key=len, reverse=True)) + boundary)


def update_result(result, new_result, keep_regex=False):
def _update_result(result, new_result, keep_regex=False):
"""
:param result:
Expand All @@ -54,28 +57,49 @@ def update_result(result, new_result, keep_regex=False):
return result


def anonymize_ahv(text, by='AHVNR', tag='#', count=True, first=1, keep_regex=False):
def _flip_replace_dict(replace_dict):
"""
:param replace_dict:
:return:
"""
flipped_dict = {}
for key, value in replace_dict.items():
if value not in flipped_dict:
flipped_dict[value] = {'matches': {key}, 'replacement': key}
else:
flipped_dict[value]['matches'].add(key)
# Determine the replacement based on length and lexicographical order
current_replacement = flipped_dict[value]['replacement']
if len(key) > len(current_replacement) or (len(key) == len(current_replacement) and key < current_replacement):
flipped_dict[value]['replacement'] = key
return flipped_dict


def anonymize_ahv(text, by='AHVNR', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False):
"""
:param text:
:param by:
:param tag:
:param start_tag:
:param end_tag:
:param count:
:param first:
:param keep_regex:
:return:
"""
regex = re.compile(r'\b756\.\s?\d{4}\.\s?\d{4}\.\s?\d{2}\b')
return anonymize_regex(text, regex, by, tag, count, first, False, keep_regex)
return anonymize_regex(text, regex, by, start_tag, end_tag, count, first, False, keep_regex)


def anonymize_currency(text, by='CURRENCY', tag='#', count=True, first=1, currency_regex=CURRENCY_REGEX,
def anonymize_currency(text, by='CURRENCY', start_tag='#', end_tag='#', count=True, first=1, currency_regex=CURRENCY_REGEX,
separator_regex=r"'|’| ", decimal_regex=r"\.", without_symbol=True, compare_lowercase=False,
keep_regex=False):
"""
:param text:
:param by:
:param tag:
:param start_tag:
:param end_tag:
:param count:
:param first:
:param currency_regex:
Expand All @@ -91,15 +115,19 @@ def anonymize_currency(text, by='CURRENCY', tag='#', count=True, first=1, curren
regex = (r"\b(?:(?:" + currency_regex + r")(?: )*-?" + num + r")" # '(?: )*' was '(?:\s)* '
+ r"|(?:-?" + num + "(?: )*(?:" + currency_regex + r")" # '(?: )*' was '(?:\s)* '
+ (r"|-?(?:" + num + sep + r")+" + num if without_symbol else "") + r")(\b)?")
return anonymize_regex(text, re.compile(regex.lower() if compare_lowercase else regex), by, tag, count, first,
return anonymize_regex(text, re.compile(regex.lower() if compare_lowercase else regex), by, start_tag, end_tag, count, first,
compare_lowercase, keep_regex)


def anonymize_dates(text, by='DATE', tag='#', count=True, first=1, lang='de', keep_regex=False):
def anonymize_dates(text, by='DATE', start_tag='#', end_tag='#', count=True, first=1, lang='de', keep_regex=False):
"""
:param text:
:param date_dict:
:param by:
:param start_tag:
:param end_tag:
:param count:
:param first:
:param lang:
:param compare_lowercase:
:return:
Expand Down Expand Up @@ -149,27 +177,28 @@ def anonymize_dates(text, by='DATE', tag='#', count=True, first=1, lang='de', ke
match = regex.search(text, match.span()[1])
for k, v in matches.items():
text = re.sub(k, v, text)
result = anonymize_regex(text, r'\d{4}-\d{2}-\d{2}', by, tag, count, first, False, keep_regex)
result = anonymize_regex(text, r'\d{4}-\d{2}-\d{2}', by, start_tag, end_tag, count, first, False, keep_regex)
result.update({'replace_dict': {k: result.get('replace_dict').get(v) for k, v in matches.items()}})
return result


def anonymize_email(text, by='EMAIL', tag='#', count=True, first=1, keep_regex=False):
def anonymize_email(text, by='EMAIL', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False):
"""
:param text:
:param by:
:param tag:
:param start_tag:
:param end_tag:
:param count:
:param first:
:param keep_regex:
:return:
"""
regex = re.compile(r'\b[\w\.-]+@[\w\.-]+\b')
return anonymize_regex(text, regex, by, tag, count, first, False, keep_regex)
return anonymize_regex(text, regex, by, start_tag, end_tag, count, first, False, keep_regex)


def anonymize_names(text, first_names_regex, last_names_regex, sep=r'[\s,\n]', by='NAME', tag='#',
def anonymize_names(text, first_names_regex, last_names_regex, sep=r'[\s,\n]', by='NAME', start_tag='#', end_tag='#',
count=True, first=1, keep_regex=False):
"""
Expand All @@ -178,7 +207,8 @@ def anonymize_names(text, first_names_regex, last_names_regex, sep=r'[\s,\n]', b
:param last_names_regex:
:param sep:
:param by:
:param tag:
:param start_tag:
:param end_tag:
:param count:
:param first:
:param keep_regex:
Expand All @@ -196,31 +226,33 @@ def _find_names(text, names_regex):
rgx = rgx + [re.compile(r'\b(?:' + n + r')\b') for n in sorted(list(set(first_names + last_names)), key=len, reverse=True)]
result = {'text': text, 'replace_dict': {}} | ({'regex': [], 'matches': []} if keep_regex else {})
for r in rgx:
result = update_result(result, anonymize_regex(result.get('text'), r, by=by, tag=tag, count=count,
result = _update_result(result, anonymize_regex(result.get('text'), r, by=by, start_tag=start_tag, end_tag=end_tag, count=count,
first=first + len(result.get('replace_dict')), keep_regex=keep_regex))
return result


def anonymize_percentage(text, by='PERC', tag='#', count=True, first=1, keep_regex=False, decimal_regex=r'\.'):
def anonymize_percentage(text, by='PERC', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False, decimal_regex=r'\.'):
"""
:param text:
:param by:
:param tag:
:param start_tag:
:param end_tag:
:param count:
:param first:
:param keep_regex
:param decimal_regex:
:return:
"""
regex = r'(?:-?\d+(?:' + decimal_regex + r'\d+)? ?%)'
return anonymize_regex(text, re.compile(regex), by, tag, count, first, False, keep_regex)
return anonymize_regex(text, re.compile(regex), by, start_tag, end_tag, count, first, False, keep_regex)


def anonymize_phone(text, by='PHONE', tag='#', count=True, first=1, keep_regex=False, country_codes=COUNTRY_CODES_REGEX):
def anonymize_phone(text, by='PHONE', start_tag='#', end_tag='#', count=True, first=1, keep_regex=False, country_codes=COUNTRY_CODES_REGEX):
"""
:param text:
:param by:
:param tag:
:param start_tag:
:param end_tag:
:param count:
:param first:
:param keep_regex:
Expand All @@ -229,11 +261,11 @@ def anonymize_phone(text, by='PHONE', tag='#', count=True, first=1, keep_regex=F
"""
regex = re.compile(r'(\b(00(' + country_codes + r')|0)|\B(\+(' + country_codes
+ r')))(\s?\(0\))?(\s)?[1-9]{2}([\s\/\-])?[0-9]{3}([\s\/\-])?[0-9]{2}([\s\/\-])?[0-9]{2}\b')
return anonymize_regex(text, regex, by, tag, count, first, False, keep_regex)
return anonymize_regex(text, regex, by, start_tag, end_tag, count, first, False, keep_regex)


def anonymize_entities(text, by_ahv='AHV', by_phone='PHONE', by_currencies='CURRENCY', by_email='EMAIL', by_dates='DATE', by_names='NAME',
first_names=None, last_names=None, tag='#', count=True, first=1, keep_regex=False):
first_names=None, last_names=None, start_tag='#', end_tag='#', count=True, first=1, flip_replace_dict=True, keep_regex=False):
"""
:param text:
Expand All @@ -243,30 +275,40 @@ def anonymize_entities(text, by_ahv='AHV', by_phone='PHONE', by_currencies='CURR
:param by_email:
:param by_dates:
:param by_names:
:param first_names:
:param last_names:
:param tag:
:param first_names: list or precompiled regex
:param last_names: list or precompiled regex
:param start_tag:
:param end_tag:
:param count:
:param first:
:param flip_replace_dict:
:param keep_regex:
:return:
"""
result = {'text': text, 'replace_dict': {}} | ({'regex': [], 'matches': []} if keep_regex else {})
if by_ahv is not None:
update_result(result, anonymize_ahv(result.get('text'), by=by_ahv, tag=tag, count=count, first=first, keep_regex=keep_regex))
_update_result(result, anonymize_ahv(result.get('text'), by=by_ahv, start_tag=start_tag, end_tag=end_tag,
count=count, first=first, keep_regex=keep_regex))
if by_phone is not None:
update_result(result, anonymize_phone(result.get('text'), by=by_phone, tag=tag, count=count, first=first, keep_regex=keep_regex))
_update_result(result, anonymize_phone(result.get('text'), by=by_phone, start_tag=start_tag, end_tag=end_tag,
count=count, first=first, keep_regex=keep_regex))
if by_currencies is not None:
update_result(result, anonymize_currency(result.get('text'), by=by_currencies, tag=tag, count=count,
first=first, keep_regex=keep_regex))
_update_result(result, anonymize_currency(result.get('text'), by=by_currencies, start_tag=start_tag, end_tag=end_tag,
count=count, first=first, keep_regex=keep_regex))
if by_email is not None:
update_result(result, anonymize_email(result.get('text'), by=by_email, tag=tag, count=count, first=first, keep_regex=keep_regex))
_update_result(result, anonymize_email(result.get('text'), by=by_email, start_tag=start_tag, end_tag=end_tag,
count=count, first=first, keep_regex=keep_regex))
if by_dates is not None:
update_result(result, anonymize_dates(result.get('text'), by=by_dates, tag=tag, count=count, first=first, keep_regex=keep_regex))
_update_result(result, anonymize_dates(result.get('text'), by=by_dates, start_tag=start_tag, end_tag=end_tag,
count=count, first=first, keep_regex=keep_regex))
if (by_names is not None) & (first_names is not None) & (last_names is not None):
first_names_regex, last_names_regex = create_names_regex(first_names), create_names_regex(last_names)
update_result(result, anonymize_names(result.get('text'), first_names_regex=first_names_regex, last_names_regex=last_names_regex,
by=by_names, tag=tag, count=count, first=first, keep_regex=keep_regex))
first_names_regex = create_names_regex(first_names)
last_names_regex = create_names_regex(last_names)
_update_result(result, anonymize_names(result.get('text'), first_names_regex=first_names_regex, last_names_regex=last_names_regex,
by=by_names, start_tag=start_tag, end_tag=end_tag, count=count, first=first,
keep_regex=keep_regex))
if flip_replace_dict:
result['replace_dict'] = _flip_replace_dict(result.get('replace_dict'))
return result


Expand Down Expand Up @@ -299,3 +341,4 @@ def demo():
text = 'Der 4. Januar 2025, der 4.1.25 und der 5.7.2023 sind Datumswerte.'
for k, v in ({'original': text} | anonymize_dates(text)).items():
print(f'{k}: {v}')

9 changes: 9 additions & 0 deletions src/utils/replacer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import json

def replace_values(text, replacements) -> str:
# Iterate through the keys in the replacements dictionary
for key, values in replacements.items():
# Replace occurrences of each value with the corresponding key
for value in values:
text = text.replace(value, key)
return text
Loading

0 comments on commit 3ca0f2a

Please sign in to comment.