diff --git a/comps/guardrails/pii_detection/.gitignore b/comps/guardrails/pii_detection/.gitignore new file mode 100644 index 000000000..017a8659d --- /dev/null +++ b/comps/guardrails/pii_detection/.gitignore @@ -0,0 +1,4 @@ +**/*pdf +**/*csv +**/*log +**/*pyc diff --git a/comps/guardrails/pii_detection/README.md b/comps/guardrails/pii_detection/README.md new file mode 100644 index 000000000..dba386e38 --- /dev/null +++ b/comps/guardrails/pii_detection/README.md @@ -0,0 +1,78 @@ +# PII Detection Microservice + +PII Detection a method to detect Personal Identifiable Information in text. This microservice provides users a unified API to either upload your files or send a list of text, and return with a list following original sequence of labels marking if it contains PII or not. + +# 🚀1. Start Microservice with Python(Option 1) + +## 1.1 Install Requirements + +```bash +pip install -r requirements.txt +``` + +## 1.2 Start PII Detection Microservice with Python Script + +Start pii detection microservice with below command. + +```bash +python pii_detection.py +``` + +# 🚀2. Start Microservice with Docker (Option 2) + +## 2.1 Prepare PII detection model + +export HUGGINGFACEHUB_API_TOKEN=${HP_TOKEN} + +## 2.1.1 use LLM endpoint (will add later) + +intro placeholder + +## 2.2 Build Docker Image + +```bash +cd ../../../ # back to GenAIComps/ folder +docker build -t opea/guardrails-pii-detection:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/guardrails/pii_detection/docker/Dockerfile . +``` + +## 2.3 Run Docker with CLI + +```bash +docker run -d --rm --runtime=runc --name="guardrails-pii-detection-endpoint" -p 6357:6357 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e HUGGINGFACEHUB_API_TOKEN=${HUGGINGFACEHUB_API_TOKEN} -e HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} opea/guardrails-pii-detection:latest +``` + +> debug mode + +```bash +docker run --rm --runtime=runc --name="guardrails-pii-detection-endpoint" -p 6357:6357 -v ./comps/guardrails/pii_detection/:/home/user/comps/guardrails/pii_detection/ --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e HUGGINGFACEHUB_API_TOKEN=${HUGGINGFACEHUB_API_TOKEN} -e HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} opea/guardrails-pii-detection:latest +``` + +# 🚀3. Get Status of Microservice + +```bash +docker container logs -f guardrails-pii-detection-endpoint +``` + +# 🚀4. Consume Microservice + +Once microservice starts, user can use below script to invoke the microservice for pii detection. + +```python +import requests +import json + +proxies = {"http": ""} +url = "http://localhost:6357/v1/dataprep" +urls = [ + "https://towardsdatascience.com/no-gpu-no-party-fine-tune-bert-for-sentiment-analysis-with-vertex-ai-custom-jobs-d8fc410e908b?source=rss----7f60cf5620c9---4" +] +payload = {"link_list": json.dumps(urls)} + +try: + resp = requests.post(url=url, data=payload, proxies=proxies) + print(resp.text) + resp.raise_for_status() # Raise an exception for unsuccessful HTTP status codes + print("Request successful!") +except requests.exceptions.RequestException as e: + print("An error occurred:", e) +``` diff --git a/comps/guardrails/pii_detection/__init__.py b/comps/guardrails/pii_detection/__init__.py new file mode 100644 index 000000000..916f3a44b --- /dev/null +++ b/comps/guardrails/pii_detection/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/guardrails/pii_detection/config.py b/comps/guardrails/pii_detection/config.py new file mode 100644 index 000000000..430532a15 --- /dev/null +++ b/comps/guardrails/pii_detection/config.py @@ -0,0 +1,45 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import pathlib + +# Embedding model + +EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5") + +# Redis Connection Information +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) + + +def get_boolean_env_var(var_name, default_value=False): + """Retrieve the boolean value of an environment variable. + + Args: + var_name (str): The name of the environment variable to retrieve. + default_value (bool): The default value to return if the variable + is not found. + + Returns: + bool: The value of the environment variable, interpreted as a boolean. + """ + true_values = {"true", "1", "t", "y", "yes"} + false_values = {"false", "0", "f", "n", "no"} + + # Retrieve the environment variable's value + value = os.getenv(var_name, "").lower() + + # Decide the boolean value based on the content of the string + if value in true_values: + return True + elif value in false_values: + return False + else: + return default_value + + +LLM_URL = os.getenv("LLM_ENDPOINT_URL", None) + +current_file_path = pathlib.Path(__file__).parent.resolve() +comps_path = os.path.join(current_file_path, "../../../") diff --git a/comps/guardrails/pii_detection/data_utils.py b/comps/guardrails/pii_detection/data_utils.py new file mode 100644 index 000000000..29e9c4196 --- /dev/null +++ b/comps/guardrails/pii_detection/data_utils.py @@ -0,0 +1,392 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import io +import json +import multiprocessing +import os +import re +import unicodedata +from urllib.parse import urlparse, urlunparse + +import easyocr +import fitz +import numpy as np +import pandas as pd +import requests +import yaml +from bs4 import BeautifulSoup +from docx import Document as DDocument +from langchain_community.document_loaders import ( + UnstructuredImageLoader, + UnstructuredMarkdownLoader, + UnstructuredPowerPointLoader, + UnstructuredXMLLoader, +) +from PIL import Image +from utils import timeout + + +def load_pdf(pdf_path): + """Load the pdf file.""" + doc = fitz.open(pdf_path) + reader = easyocr.Reader(["en"], gpu=False) + result = "" + for i in range(doc.page_count): + page = doc.load_page(i) + pagetext = page.get_text().strip() + if pagetext: + if pagetext.endswith("!") or pagetext.endswith("?") or pagetext.endswith("."): + result = result + pagetext + else: + result = result + pagetext + "." + if len(doc.get_page_images(i)) > 0: + for img in doc.get_page_images(i): + if img: + pageimg = "" + xref = img[0] + img_data = doc.extract_image(xref) + img_bytes = img_data["image"] + pil_image = Image.open(io.BytesIO(img_bytes)) + img = np.array(pil_image) + img_result = reader.readtext(img, paragraph=True, detail=0) + pageimg = pageimg + ", ".join(img_result).strip() + if pageimg.endswith("!") or pageimg.endswith("?") or pageimg.endswith("."): + pass + else: + pageimg = pageimg + "." + result = result + pageimg + return result + + +def load_html(html_path): + """Load the html file.""" + with open(html_path, "r", encoding="utf-8") as file: + html = file.read() + soup = BeautifulSoup(html, "html.parser") + text = soup.get_text(strip=True) + return text + + +def load_txt(txt_path): + """Load txt file.""" + with open(txt_path, "r") as file: + text = file.read() + return text + + +def load_doc(doc_path): + """Load doc file.""" + txt_path = doc_path.replace(".doc", ".txt") + try: + os.system(f'antiword "{doc_path}" > "{txt_path}"') + except: + raise AssertionError( + "antiword failed or not installed, if not installed," + + 'use "apt-get update && apt-get install -y antiword" to install it.' + ) + text = load_txt(txt_path) + os.remove(txt_path) + return text + + +def load_docx(docx_path): + """Load docx file.""" + doc = DDocument(docx_path) + text = "" + for paragraph in doc.paragraphs: + text += paragraph.text + return text + + +def load_pptx(pptx_path): + """Load pptx file.""" + loader = UnstructuredPowerPointLoader(pptx_path) + text = loader.load()[0].page_content + return text + + +def load_md(md_path): + """Load md file.""" + loader = UnstructuredMarkdownLoader(md_path) + text = loader.load()[0].page_content + return text + + +def load_xml(xml_path): + """Load xml file.""" + loader = UnstructuredXMLLoader(xml_path) + text = loader.load()[0].page_content + return text + + +def load_json(json_path): + """Load and process json file.""" + with open(json_path, "r") as file: + data = json.load(file) + return json.dumps(data) + + +def load_yaml(yaml_path): + """Load and process yaml file.""" + with open(yaml_path, "r") as file: + data = yaml.safe_load(file) + return yaml.dump(data) + + +def load_xlsx(input_path): + """Load and process xlsx file.""" + df = pd.read_excel(input_path) + return df.to_string() + + +def load_csv(input_path): + """Load the csv file.""" + df = pd.read_csv(input_path) + return df.to_string() + + +def load_image(image_path): + """Load the image file.""" + loader = UnstructuredImageLoader(image_path) + text = loader.load()[0].page_content + return text + + +def load_svg(svg_path): + """Load the svg file.""" + import cairosvg + + png_path = svg_path.replace(".svg", ".png") + cairosvg.svg2png(url=svg_path, write_to=png_path) + text = load_image(png_path) + os.remove(png_path) + return text + + +def document_loader(doc_path): + if doc_path.endswith(".pdf"): + return load_pdf(doc_path) + elif doc_path.endswith(".html"): + return load_html(doc_path) + elif doc_path.endswith(".txt"): + return load_txt(doc_path) + elif doc_path.endswith(".doc"): + return load_doc(doc_path) + elif doc_path.endswith(".docx"): + return load_docx(doc_path) + elif doc_path.endswith(".pptx") or doc_path.endswith(".ppt"): + return load_pptx(doc_path) + elif doc_path.endswith(".md"): + return load_md(doc_path) + elif doc_path.endswith(".xml"): + return load_xml(doc_path) + elif doc_path.endswith(".json") or doc_path.endswith(".jsonl"): + return load_json(doc_path) + elif doc_path.endswith(".yaml"): + return load_yaml(doc_path) + elif doc_path.endswith(".xlsx") or doc_path.endswith(".xls"): + return load_xlsx(doc_path) + elif doc_path.endswith(".csv"): + return load_csv(doc_path) + elif doc_path.endswith(".tiff"): + return load_image(doc_path) + elif doc_path.endswith(".svg"): + return load_image(doc_path) + else: + raise NotImplementedError( + "Current only support pdf, html, txt, doc, docx, pptx, ppt, md, xml" + + ", json, jsonl, yaml, xlsx, xls, csv, tiff and svg format." + ) + + +class Crawler: + def __init__(self, pool=None): + if pool: + assert isinstance(pool, (str, list, tuple)), "url pool should be str, list or tuple" + self.pool = pool + self.headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng, \ + */*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, \ + like Gecko) Chrome/113.0.0.0 Safari/537.36", + } + self.fetched_pool = set() + + def get_sublinks(self, soup): + sublinks = [] + for links in soup.find_all("a"): + sublinks.append(str(links.get("href"))) + return sublinks + + def get_hyperlink(self, soup, base_url): + sublinks = [] + for links in soup.find_all("a"): + link = str(links.get("href")) + if link.startswith("#") or link is None or link == "None": + continue + suffix = link.split("/")[-1] + if "." in suffix and suffix.split(".")[-1] not in ["html", "htmld"]: + continue + link_parse = urlparse(link) + base_url_parse = urlparse(base_url) + if link_parse.path == "": + continue + if link_parse.netloc != "": + # keep crawler works in the same domain + if link_parse.netloc != base_url_parse.netloc: + continue + sublinks.append(link) + else: + sublinks.append( + urlunparse( + ( + base_url_parse.scheme, + base_url_parse.netloc, + link_parse.path, + link_parse.params, + link_parse.query, + link_parse.fragment, + ) + ) + ) + return sublinks + + def fetch(self, url, headers=None, max_times=5): + if not headers: + headers = self.headers + while max_times: + if not url.startswith("http") or not url.startswith("https"): + url = "http://" + url + print("start fetch %s...", url) + try: + response = requests.get(url, headers=headers, verify=True) + if response.status_code != 200: + print("fail to fetch %s, response status code: %s", url, response.status_code) + else: + return response + except Exception as e: + print("fail to fetch %s, caused by %s", url, e) + raise Exception(e) + max_times -= 1 + return None + + def process_work(self, sub_url, work): + response = self.fetch(sub_url) + if response is None: + return [] + self.fetched_pool.add(sub_url) + soup = self.parse(response.text) + base_url = self.get_base_url(sub_url) + sublinks = self.get_hyperlink(soup, base_url) + if work: + work(sub_url, soup) + return sublinks + + def crawl(self, pool, work=None, max_depth=10, workers=10): + url_pool = set() + for url in pool: + base_url = self.get_base_url(url) + response = self.fetch(url) + soup = self.parse(response.text) + sublinks = self.get_hyperlink(soup, base_url) + self.fetched_pool.add(url) + url_pool.update(sublinks) + depth = 0 + while len(url_pool) > 0 and depth < max_depth: + print("current depth %s...", depth) + mp = multiprocessing.Pool(processes=workers) + results = [] + for sub_url in url_pool: + if sub_url not in self.fetched_pool: + results.append(mp.apply_async(self.process_work, (sub_url, work))) + mp.close() + mp.join() + url_pool = set() + for result in results: + sublinks = result.get() + url_pool.update(sublinks) + depth += 1 + + def parse(self, html_doc): + soup = BeautifulSoup(html_doc, "lxml") + return soup + + def download(self, url, file_name): + print("download %s into %s...", url, file_name) + try: + r = requests.get(url, stream=True, headers=self.headers, verify=True) + f = open(file_name, "wb") + for chunk in r.iter_content(chunk_size=512): + if chunk: + f.write(chunk) + except Exception as e: + print("fail to download %s, caused by %s", url, e) + + def get_base_url(self, url): + result = urlparse(url) + return urlunparse((result.scheme, result.netloc, "", "", "", "")) + + def clean_text(self, text): + text = text.strip().replace("\r", "\n") + text = re.sub(" +", " ", text) + text = re.sub("\n+", "\n", text) + text = text.split("\n") + return "\n".join([i for i in text if i and i != " "]) + + +def uni_pro(text): + """Check if the character is ASCII or falls in the category of non-spacing marks.""" + normalized_text = unicodedata.normalize("NFKD", text) + filtered_text = "" + for char in normalized_text: + if ord(char) < 128 or unicodedata.category(char) == "Mn": + filtered_text += char + return filtered_text + + +def load_html_data(url): + crawler = Crawler() + res = crawler.fetch(url) + if res is None: + return None + soup = crawler.parse(res.text) + all_text = crawler.clean_text(soup.select_one("body").text) + main_content = "" + for element_name in ["main", "container"]: + main_block = None + if soup.select(f".{element_name}"): + main_block = soup.select(f".{element_name}") + elif soup.select(f"#{element_name}"): + main_block = soup.select(f"#{element_name}") + if main_block: + for element in main_block: + text = crawler.clean_text(element.text) + if text not in main_content: + main_content += f"\n{text}" + main_content = crawler.clean_text(main_content) + main_content = all_text if main_content == "" else main_content + main_content = main_content.replace("\n", "") + main_content = main_content.replace("\n\n", "") + main_content = uni_pro(main_content) + main_content = re.sub(r"\s+", " ", main_content) + + return main_content + + +def parse_html(input): + """Parse the uploaded file.""" + chucks = [] + for link in input: + if re.match(r"^https?:/{2}\w.+$", link): + content = load_html_data(link) + if content is None: + continue + chuck = [[content.strip(), link]] + chucks += chuck + else: + print("The given link/str {} cannot be parsed.".format(link)) + + return chucks diff --git a/comps/guardrails/pii_detection/docker/Dockerfile b/comps/guardrails/pii_detection/docker/Dockerfile new file mode 100644 index 000000000..f097352cf --- /dev/null +++ b/comps/guardrails/pii_detection/docker/Dockerfile @@ -0,0 +1,38 @@ + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.11-slim + +ENV LANG C.UTF-8 + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + build-essential \ + libgl1-mesa-glx \ + libjemalloc-dev \ + vim + +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +USER user + +COPY comps /home/user/comps + +RUN pip install --no-cache-dir --upgrade pip setuptools && \ + pip install --no-cache-dir -r /home/user/comps/guardrails/pii_detection/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +USER root + +RUN mkdir -p /home/user/comps/guardrails/pii_detection/uploaded_files && chown -R user /home/user/comps/guardrails/pii_detection/uploaded_files +RUN mkdir -p /home/user/comps/guardrails/pii_detection/status && chown -R user /home/user/comps/guardrails/pii_detection/status + +USER user + +WORKDIR /home/user/comps/guardrails/pii_detection + +ENTRYPOINT ["python", "pii_detection.py"] + diff --git a/comps/guardrails/pii_detection/pii/__init__.py b/comps/guardrails/pii_detection/pii/__init__.py new file mode 100644 index 000000000..4505e776e --- /dev/null +++ b/comps/guardrails/pii_detection/pii/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +__all__ = ["pii_detection", "pii_redaction"] diff --git a/comps/guardrails/pii_detection/pii/detect/__init__.py b/comps/guardrails/pii_detection/pii/detect/__init__.py new file mode 100644 index 000000000..916f3a44b --- /dev/null +++ b/comps/guardrails/pii_detection/pii/detect/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/guardrails/pii_detection/pii/detect/emails_detection.py b/comps/guardrails/pii_detection/pii/detect/emails_detection.py new file mode 100644 index 000000000..4c5704fbd --- /dev/null +++ b/comps/guardrails/pii_detection/pii/detect/emails_detection.py @@ -0,0 +1,87 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +""" This code is adapted from BigScience PII detection +https://github.com/bigscience-workshop/data-preparation/blob/main/preprocessing/training/02_pii/bigscience_pii_detect_redact.py + +MST BigScience PII Code +Original colab that is a source of this file is located at + https://colab.research.google.com/drive/1086H3-LGMz3gX0pGy9ECgr8KflosSKso +# License +Copyright 2022 Authors of this Notebook +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import sys + +import regex + +# Note: to reduce false positives, a number of technically-valid-but-rarely-used +# email address patterns (e.g. with parenthesis or slashes) will not match +email_pattern = regex.compile( + r""" + (?<= ^ | [[({<\b\s@,?!;'"\p{Han}¿¡:.] | \\['"] ) # left delimiter + ( + (?: # local part + [^][(){}<>\b\s@,?!;'":#/\\=.\-] # arbitrary character + | + (?: [=.\-] (?! [.@]) ) # ".=-" not before ".@" + )+ + @ + (?: + (?: + \w # single-letter subdomain + | + [^.\b\s@?!;,/()>\-:] # subdomain (>=2 letter) + [^.\b\s@?!;,/()>]{0,62} + [^.\b\s@?!;,/()>\-:'"] + ) + \. + ){1,10} + (?: [\p{L}\p{M}]{2,63} | xn-- \w+ ) # TLD, including IDN + ) + (?= $ | [])}>\b\s@,?!;'"\p{Han}] | \\['"] | : (?! \d) | \. (?! \S)) # right delim +""", + flags=regex.MULTILINE | regex.VERBOSE, +) + + +def detect_email(content): + """Detects email addresses in a string using regex matching + Args: + content (str): A string containing the text to be analyzed. + Returns: + A list of dicts containing the tag type, the matched string, and the start and + end indices of the match. + """ + + matches = [] + + # regex matching + matches_tmp = email_pattern.finditer(content) + for match in matches_tmp: + if match.groups(): + if len(match.groups()) > 1 and match.groups()[1]: + sys.stderr.write("Warning: Found substring matches in the main match.") + # setup outputs + value = match.group(1) + start, end = match.span(1) + if value: + matches.append( + { + "tag": "EMAIL", + "value": value, + "start": start, + "end": end, + } + ) + else: + raise ValueError("No match found inside groups") + return matches diff --git a/comps/guardrails/pii_detection/pii/detect/ip_detection.py b/comps/guardrails/pii_detection/pii/detect/ip_detection.py new file mode 100644 index 000000000..3616f1903 --- /dev/null +++ b/comps/guardrails/pii_detection/pii/detect/ip_detection.py @@ -0,0 +1,131 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +""" This code is adapted from BigScience PII detection +https://github.com/bigscience-workshop/data-preparation/blob/main/preprocessing/training/02_pii/bigscience_pii_detect_redact.py + +MST BigScience PII Code +Original colab that is a source of this file is located at + https://colab.research.google.com/drive/1086H3-LGMz3gX0pGy9ECgr8KflosSKso +# License +Copyright 2022 Authors of this Notebook +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import ipaddress +import sys + +import regex + +year_patterns = [ + regex.compile( + r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([1-2][0-9]{3}[\p{Pd}/][1-2][0-9]{3})(?:$|[\s@,?!;:\'\"(.\p{Han}])" + ), # yyyy-yyyy or yyyy/yyyy + regex.compile( + r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([1-2][0-9]{3}[\p{Pd}/.][0-3][0-9][\p{Pd}/.][0-3][0-9])(?:$|[\s@,?!;:\'\"(.\p{Han}])" + ), # yyyy-mm-dd or yyyy-dd-mm or yyyy/mm/dd or yyyy/dd/mm or yyyy.mm.dd or yyyy.dd.mm + regex.compile( + r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([0-3][0-9][\p{Pd}/.][0-3][0-9][\p{Pd}/.](?:[0-9]{2}|[1-2][0-9]{3}))(?:$|[\s@,?!;:\'\"(.\p{Han}])" + ), + # mm-dd-yyyy or dd-mm-yyyy or mm/dd/yyyy or dd/mm/yyyy or mm.dd.yyyy or dd.mm.yyyy or the same but with yy instead of yyyy + regex.compile( + r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([0-3][0-9][\p{Pd}/](?:[0-9]{2}|[1-2][0-9]{3}))(?:$|[\s@,?!;:\'\"(.\p{Han}])" + ), # mm-yyyy or mm/yyyy or the same but with yy + regex.compile( + r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([1-2][0-9]{3}-[0-3][0-9])(?:$|[\s@,?!;:\'\"(.\p{Han}])" + ), # yyyy-mm or yyyy/mm +] + +ipv4_pattern = r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}" +ipv6_pattern = r"(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])" +ip_pattern = regex.compile( + r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])(" + r"|".join([ipv4_pattern, ipv6_pattern]) + ")(?:$|[\s@,?!;:'\"(.\p{Han}])", + flags=regex.MULTILINE, +) + + +def matches_date_pattern(matched_str): + # Screen out date false positives + for year_regex in year_patterns: + if year_regex.match(matched_str): + return True + return False + + +def ip_has_digit(matched_str): + """Checks to make sure the PII span is not just :: or whatever that may + accidentally be picked up by making sure there are digits.""" + return any(map(str.isdigit, matched_str)) + + +def filter_versions(matched_str, context): + """Filter addresses in this format x.x.x.x and the words dns/server + don't appear in the neighboring context, usually they are just versions.""" + # count occurrence of dots + dot_count = matched_str.count(".") + exclude = dot_count == 3 and len(matched_str) == 7 + if exclude: + if "dns" in context.lower() or "server" in context.lower(): + return False + return exclude + + +def not_ip_address(matched_str): + """make sure the string has a valid IP address format + e.g: 33.01.33.33 is not a valid IP address because of the 0 in front of 1 + TODO: fix this directly in the regex""" + try: + ipaddress.ip_address(matched_str) + return False + except ValueError: + return True + + +def detect_ip(content): + """Detects ip addresses in a string using regex matching + Args: + content (str): A string containing the text to be analyzed. + Returns: + A list of dicts containing the tag type, the matched string, and the start and + end indices of the match. + """ + + matches = [] + + # regex matching + matches_tmp = ip_pattern.finditer(content) + for match in matches_tmp: + if match.groups(): + if len(match.groups()) > 1 and match.groups()[1]: + sys.stderr.write("Warning: Found substring matches in the main match.") + # setup outputs + value = match.group(1) + start, end = match.span(1) + if value: + # Filter out false positive IPs + if not ip_has_digit(value): + continue + if matches_date_pattern(value): + continue + if filter_versions(value, content[start - 100 : end + 100]) or not_ip_address(value): + continue + # combine if conditions in one + + matches.append( + { + "tag": "IP_ADDRESS", + "value": value, + "start": start, + "end": end, + } + ) + else: + raise ValueError("No match found inside groups") + return matches diff --git a/comps/guardrails/pii_detection/pii/detect/keys_detection.py b/comps/guardrails/pii_detection/pii/detect/keys_detection.py new file mode 100755 index 000000000..796430a64 --- /dev/null +++ b/comps/guardrails/pii_detection/pii/detect/keys_detection.py @@ -0,0 +1,153 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +""" This code is adapted from BigCode PII +https://github.com/bigcode-project/bigcode-dataset/blob/main/pii/utils/keys_detection.py +""" +import os + +# Secrets detection with detect-secrets tool + + +def get_detector_model(): + return os.path.join(os.path.dirname(__file__), "gibberish_data/big.model") + + +filters = [ + # some filters from [original list](https://github.com/Yelp/detect-secrets/blob/master/docs/filters.md#built-in-filters) + # were removed based on their targets + {"path": "detect_secrets.filters.heuristic.is_potential_uuid"}, + {"path": "detect_secrets.filters.heuristic.is_likely_id_string"}, + {"path": "detect_secrets.filters.heuristic.is_templated_secret"}, + {"path": "detect_secrets.filters.heuristic.is_sequential_string"}, + # {"path": "detect_secrets.filters.gibberish.should_exclude_secret", "model": get_detector_model(), "limit": 4.0}, +] +plugins = [ + {"name": "ArtifactoryDetector"}, + {"name": "AWSKeyDetector"}, + # the entropy detectors esp Base64 need the gibberish detector on top + {"name": "Base64HighEntropyString"}, + {"name": "HexHighEntropyString"}, + {"name": "AzureStorageKeyDetector"}, + {"name": "CloudantDetector"}, + {"name": "DiscordBotTokenDetector"}, + {"name": "GitHubTokenDetector"}, + {"name": "IbmCloudIamDetector"}, + {"name": "IbmCosHmacDetector"}, + {"name": "JwtTokenDetector"}, + {"name": "MailchimpDetector"}, + {"name": "NpmDetector"}, + {"name": "SendGridDetector"}, + {"name": "SlackDetector"}, + {"name": "SoftlayerDetector"}, + {"name": "StripeDetector"}, + {"name": "TwilioKeyDetector"}, +] + + +def is_gibberish(matched_str): + """Checks to make sure the PII span is gibberish and not word like.""" + # pip install gibberish-detector + # download the training corpora from https://raw.githubusercontent.com/domanchi/gibberish-detector/master/examples/big.txt + # run gibberish-detector train big.txt > big.model to generate the model (it takes 3 seconds) + # Detector = detector.create_from_model(os.path.abspath('utils/gibberish_data/big.model')) + detector = get_detector_model() + return detector.is_gibberish(matched_str.lower()) + + +def is_hash(content, value): + """Second check if the value is a hash (after gibberish detector)""" + # get the line where value occurred + try: + res = content.index(value) + except ValueError: + # TODO: fix this issue happened one for JS in the stack-smol, file did contain value + print("Value not found in content, why this happened?") + return False + lines = content[: content.index(value)].splitlines() + target_line = lines[-1] + if len(value) in [32, 40, 64]: + # if "sha" or "md5" are in content: + keywords = ["sha", "md5", "hash", "byte"] + if any(x in target_line.lower() for x in keywords): + return True + return False + + +def file_has_hashes(content, coeff=0.02): + """Checks if the file contains literals 'hash' or 'sha' for more than 2% nb_of_lines.""" + lines = content.splitlines() + count_sha = 0 + count_hash = 0 + nlines = content.count("\n") + threshold = int(coeff * nlines) + for line in lines: + count_sha += line.lower().count("sha") + count_hash += line.lower().count("hash") + if count_sha > threshold or count_hash > threshold: + return True + return False + + +def get_indexes(text, value): + string = text + indexes = [] + new_start = 0 + while True: + try: + start = string.index(value) + indexes.append(new_start + start) + new_start = new_start + start + len(value) + string = text[new_start:] + except ValueError: + break + indexes = [(x, x + len(value)) for x in indexes] + return indexes + + +def scan_secrets(line: str): + from detect_secrets.core.scan import _process_line_based_plugins + + lines = line.splitlines(keepends=True) + for secret in _process_line_based_plugins( + lines=list(enumerate(lines, start=1)), + filename="Adhoc String", + ): + yield secret + + +def detect_keys(content): + """Detect secret keys in content using detect-secrets tool + Args: + content (str): string containing the text to be analyzed. + suffix (str): suffix of the file + Returns: + A list of dicts containing the tag type, the matched string, and the start and + end indices of the match.""" + + # We initialize the `settings` variable here, but we can't save it to the global object + # yet, since the contextmanager will revert those changes. As such, we quit the context + # first, then set it to the global namespace. + try: + from detect_secrets.core import scan + except ImportError: + os.system("pip install detect-secrets") + os.system("pip install gibberish-detector") + + from detect_secrets.settings import transient_settings + + with transient_settings({"plugins_used": plugins, "filters_used": filters}) as settings: + matches = [] + for secret in scan_secrets(content): + if is_hash(content, secret.secret_value) or file_has_hashes(content): + continue + indexes = get_indexes(content, secret.secret_value) + for start, end in indexes: + matches.append( + { + "tag": "KEY", + "value": secret.secret_value, + "start": start, + "end": end, + } + ) + return matches diff --git a/comps/guardrails/pii_detection/pii/detect/name_password_detection.py b/comps/guardrails/pii_detection/pii/detect/name_password_detection.py new file mode 100644 index 000000000..91f51e9ec --- /dev/null +++ b/comps/guardrails/pii_detection/pii/detect/name_password_detection.py @@ -0,0 +1,39 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from .utils import PIIEntityType + + +def detect_name_password(content, pipeline, entity_types=None): + """Detects name and password in a string using bigcode/starpii model + Args: + entity_types: detection types + pipeline: a transformer model + content (str): A string containing the text to be analyzed. + Returns: + A list of dicts containing the tag type, the matched string, and the start and + end indices of the match. + """ + if entity_types is None: + entity_types = [PIIEntityType.NAME, PIIEntityType.PASSWORD] + matches = [] + if pipeline is None: + return matches + try: + for entity in pipeline(content): + entity_group = entity["entity_group"] + if ("NAME" == entity_group and PIIEntityType.NAME in entity_types) or ( + "PASSWORD" == entity_group and PIIEntityType.PASSWORD in entity_types + ): + matches.append( + { + "tag": entity_group, + "value": entity["word"], + "start": entity["start"], + "end": entity["end"], + } + ) + except: + pass + + return matches diff --git a/comps/guardrails/pii_detection/pii/detect/phones_detection.py b/comps/guardrails/pii_detection/pii/detect/phones_detection.py new file mode 100644 index 000000000..0f6f770d3 --- /dev/null +++ b/comps/guardrails/pii_detection/pii/detect/phones_detection.py @@ -0,0 +1,26 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os + + +def detect_phones(text): + """Detects phone in a string using phonenumbers library only detection the international phone number.""" + try: + import phonenumbers + except ImportError: + os.system("pip install phonenumbers") + import phonenumbers + + matches = [] + + for match in phonenumbers.PhoneNumberMatcher(text, "IN"): + matches.append( + { + "tag": "PHONE_NUMBER", + "value": match.raw_string, + "start": match.start, + "end": match.end, + } + ) + return matches diff --git a/comps/guardrails/pii_detection/pii/detect/utils.py b/comps/guardrails/pii_detection/pii/detect/utils.py new file mode 100644 index 000000000..60b8414fe --- /dev/null +++ b/comps/guardrails/pii_detection/pii/detect/utils.py @@ -0,0 +1,34 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from enum import Enum, auto + + +class PIIEntityType(Enum): + IP_ADDRESS = auto() + NAME = auto() + EMAIL = auto() + PHONE_NUMBER = auto() + PASSWORD = auto() + KEY = auto() + + @classmethod + def default(cls): + return [PIIEntityType.IP_ADDRESS, PIIEntityType.EMAIL, PIIEntityType.PHONE_NUMBER, PIIEntityType.KEY] + + @classmethod + def parse(cls, entity): + if "name" == entity: + return PIIEntityType.NAME + elif "password" == entity: + return PIIEntityType.PASSWORD + elif "email" == entity: + return PIIEntityType.EMAIL + elif "phone_number" == entity: + return PIIEntityType.PHONE_NUMBER + elif "ip" == entity: + return PIIEntityType.PHONE_NUMBER + elif "key" == entity: + return PIIEntityType.KEY + else: + raise NotImplementedError(f" entity type {entity} is not supported!") diff --git a/comps/guardrails/pii_detection/pii/pii_utils.py b/comps/guardrails/pii_detection/pii/pii_utils.py new file mode 100644 index 000000000..900cc8fbe --- /dev/null +++ b/comps/guardrails/pii_detection/pii/pii_utils.py @@ -0,0 +1,79 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +from typing import List + +from .detect.emails_detection import detect_email +from .detect.ip_detection import detect_ip +from .detect.keys_detection import detect_keys +from .detect.name_password_detection import detect_name_password +from .detect.phones_detection import detect_phones +from .detect.utils import PIIEntityType + + +class PIIDetector: + def __init__(strategy: str): + pass + + def detect_pii(self, data): + import random + + return random.choice([True, False]) + + +class PIIDetectorWithLLM(PIIDetector): + def __init__(self): + super().__init__() + + def detect_pii(self, text): + return True + + +class PIIDetectorWithNER(PIIDetector): + def __init__(self, model_path=None): + super().__init__() + from transformers import AutoTokenizer, pipeline + + _model_key = "bigcode/starpii" + _model_key = _model_key if model_path is None else os.path.join(model_path, _model_key) + try: + tokenizer = AutoTokenizer.from_pretrained(_model_key, model_max_length=512) + self.pipeline = pipeline( + model=_model_key, task="token-classification", tokenizer=tokenizer, grouped_entities=True + ) + except Exception as e: + print("Failed to load model, skip NER classification", e) + self.pipeline = None + + def detect_pii(self, text): + result = [] + # use a regex to detect ip addresses + + entity_types = PIIEntityType.default() + + if PIIEntityType.IP_ADDRESS in entity_types: + result = result + detect_ip(text) + # use a regex to detect emails + if PIIEntityType.EMAIL in entity_types: + result = result + detect_email(text) + # for phone number use phonenumbers tool + if PIIEntityType.PHONE_NUMBER in entity_types: + result = result + detect_phones(text) + if PIIEntityType.KEY in entity_types: + result = result + detect_keys(text) + + if PIIEntityType.NAME in entity_types or PIIEntityType.PASSWORD in entity_types: + result = result + detect_name_password(text, self.pipeline, entity_types) + + print(result) + + return True if len(result) > 0 else False # Dummy function, replace with actual logic + + +class PIIDetectorWithML(PIIDetector): + def __init__(self): + super().__init__() + + def detect_pii(self, text): + return True diff --git a/comps/guardrails/pii_detection/pii_detection.py b/comps/guardrails/pii_detection/pii_detection.py new file mode 100644 index 000000000..b49ac7065 --- /dev/null +++ b/comps/guardrails/pii_detection/pii_detection.py @@ -0,0 +1,190 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import json +import os +import pathlib +import sys +from pathlib import Path + +from fastapi import File, Form, HTTPException, UploadFile +from langsmith import traceable + +cur_path = pathlib.Path(__file__).parent.resolve() +comps_path = os.path.join(cur_path, "../../../") +sys.path.append(comps_path) + +from typing import List + +from tqdm import tqdm + +from comps import DocPath, opea_microservices, register_microservice +from comps.guardrails.pii_detection.data_utils import document_loader, parse_html +from comps.guardrails.pii_detection.pii.pii_utils import ( + PIIDetector, + PIIDetectorWithLLM, + PIIDetectorWithML, + PIIDetectorWithNER, +) +from comps.guardrails.pii_detection.ray_utils import ray_execute, ray_runner_initialization, rayds_initialization +from comps.guardrails.pii_detection.utils import ( + Timer, + generate_log_name, + get_max_cpus, + prepare_env, + save_file_to_local_disk, +) + + +def get_pii_detection_inst(strategy="dummy", settings=None): + if strategy == "ner": + return PIIDetectorWithNER() + elif strategy == "ml": + return PIIDetectorWithML() + elif strategy == "llm": + return PIIDetectorWithLLM() + else: + # Default strategy - dummy + return PIIDetector() + + +def file_based_pii_detect(file_list: List[DocPath], strategy, enable_ray=False, debug=False): + """Ingest document to Redis.""" + file_list = [f.path for f in file_list] + pii_detector = get_pii_detection_inst(strategy=strategy) + + if enable_ray: + num_cpus = get_max_cpus(len(file_list)) + print(f"per task num_cpus: {num_cpus}") + + log_name = generate_log_name(file_list) + ds = rayds_initialization(file_list, document_loader, lazy_mode=True, num_cpus=num_cpus) + ds = ds.map(ray_runner_initialization(pii_detector.detect_pii, debug=debug), num_cpus=num_cpus) + ret = ray_execute(ds, log_name) + + else: + ret = [] + for file in tqdm(file_list, total=len(file_list)): + with Timer(f"read document {file}."): + data = document_loader(file) + with Timer(f"detect pii on document {file} to Redis."): + ret.append(pii_detector.detect_pii(data)) + return ret + + +def link_based_pii_detect(link_list: List[str], strategy, enable_ray=False, debug=False): + link_list = [str(f) for f in link_list] + pii_detector = get_pii_detection_inst(strategy=strategy) + + def _parse_html(link): + data = parse_html([link]) + return data[0][0] + + if enable_ray: + num_cpus = get_max_cpus(len(link_list)) + print(f"per task num_cpus: {num_cpus}") + + log_name = generate_log_name(link_list) + ds = rayds_initialization(link_list, _parse_html, lazy_mode=True, num_cpus=num_cpus) + ds = ds.map(ray_runner_initialization(pii_detector.detect_pii, debug=debug), num_cpus=num_cpus) + ret = ray_execute(ds, log_name) + else: + ret = [] + for link in tqdm(link_list, total=len(link_list)): + with Timer(f"read document {link}."): + data = _parse_html(link) + if debug: + print("content is: ", data) + with Timer(f"detect pii on document {link} to Redis."): + ret.append(pii_detector.detect_pii(data)) + return ret + + +def text_based_pii_detect(text_list: List[str], strategy, enable_ray=False, debug=False): + text_list = [str(f) for f in text_list] + pii_detector = get_pii_detection_inst(strategy=strategy) + + if enable_ray: + num_cpus = get_max_cpus(len(text_list)) + print(f"per task num_cpus: {num_cpus}") + + log_name = generate_log_name(text_list) + ds = rayds_initialization(text_list, None, lazy_mode=True, num_cpus=num_cpus) + ds = ds.map(ray_runner_initialization(pii_detector.detect_pii, debug=debug), num_cpus=num_cpus) + ret = ray_execute(ds, log_name) + else: + ret = [] + for data in tqdm(text_list, total=len(text_list)): + if debug: + print("content is: ", data) + with Timer(f"detect pii on document {data[:50]} to Redis."): + ret.append(pii_detector.detect_pii(data)) + return ret + + +@register_microservice( + name="opea_service@guardrails-pii-detection", endpoint="/v1/piidetect", host="0.0.0.0", port=6357 +) +async def pii_detection(files: List[UploadFile] = File(None), link_list: str = Form(None), text_list: str = Form(None)): + if not files and not link_list and not text_list: + raise HTTPException(status_code=400, detail="Either files, link_list, or text_list must be provided.") + + strategy = "ner" # Default strategy + pip_requirement = ["detect-secrets", "phonenumbers", "gibberish-detector"] + + if files: + saved_path_list = [] + try: + if not isinstance(files, list): + files = [files] + upload_folder = "./uploaded_files/" + if not os.path.exists(upload_folder): + Path(upload_folder).mkdir(parents=True, exist_ok=True) + + # TODO: use ray to parallelize the file saving + for file in files: + save_path = upload_folder + file.filename + await save_file_to_local_disk(save_path, file) + saved_path_list.append(DocPath(path=save_path)) + + enable_ray = False if len(saved_path_list) <= 10 else True + if enable_ray: + prepare_env(enable_ray=enable_ray, pip_requirements=pip_requirement, comps_path=comps_path) + ret = file_based_pii_detect(saved_path_list, strategy, enable_ray=enable_ray) + return {"status": 200, "message": json.dumps(ret)} + except Exception as e: + raise HTTPException(status_code=400, detail=f"An error occurred: {e}") + + if text_list: + try: + text_list = json.loads(text_list) # Parse JSON string to list + if not isinstance(text_list, list): + text_list = [text_list] + enable_ray = False if len(text_list) <= 10 else True + if enable_ray: + prepare_env(enable_ray=enable_ray, pip_requirements=pip_requirement, comps_path=comps_path) + ret = text_based_pii_detect(text_list, strategy, enable_ray=enable_ray) + return {"status": 200, "message": json.dumps(ret)} + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") + except Exception as e: + raise HTTPException(status_code=400, detail=f"An error occurred: {e}") + + if link_list: + try: + link_list = json.loads(link_list) # Parse JSON string to list + if not isinstance(link_list, list): + link_list = [link_list] + enable_ray = False if len(link_list) <= 10 else True + if enable_ray: + prepare_env(enable_ray=enable_ray, pip_requirements=pip_requirement, comps_path=comps_path) + ret = link_based_pii_detect(link_list, strategy, enable_ray=enable_ray) + return {"status": 200, "message": json.dumps(ret)} + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") + except Exception as e: + raise HTTPException(status_code=400, detail=f"An error occurred: {e}") + + +if __name__ == "__main__": + opea_microservices["opea_service@guardrails-pii-detection"].start() diff --git a/comps/guardrails/pii_detection/ray_utils.py b/comps/guardrails/pii_detection/ray_utils.py new file mode 100644 index 000000000..c4018d4c9 --- /dev/null +++ b/comps/guardrails/pii_detection/ray_utils.py @@ -0,0 +1,97 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional, Union + +import pyarrow +import ray +from ray.data.block import Block +from ray.data.datasource import FileBasedDatasource +from tqdm import tqdm +from utils import Timer, get_failable_with_time, save_logs, timeout + + +class RayDataLoader(FileBasedDatasource): + def __init__( + self, + paths: Union[str, List[str]], + dataloader_callable: Optional[Callable], + document_ld_args: Optional[Dict[str, Any]] = None, + **file_based_datasource_kwargs, + ): + super().__init__(paths, **file_based_datasource_kwargs) + self.dataloader_callable = dataloader_callable + self.args = document_ld_args or {} + + def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]: + from ray.data._internal.arrow_block import ArrowBlockBuilder + + builder = ArrowBlockBuilder() + path = f"{path}" + data, error, read_time = self.dataloader_callable(path) + item = {"data": data, "filename": path, "error": error, "read_time": f"{read_time} secs"} + builder.add(item) + yield builder.build() + + +def rayds_initialization(file_paths, dataloader_callable, lazy_mode=True, num_cpus=20): + if dataloader_callable is None: + text_list = [{"data": data, "filename": data[:50], "error": None, "read_time": "0 secs"} for data in file_paths] + return ray.data.from_items(text_list) + + decorated_dataloader_callable = get_failable_with_time(dataloader_callable) + if lazy_mode: + if num_cpus is None: + return ray.data.read_datasource(RayDataLoader(file_paths, decorated_dataloader_callable)) + else: + return ray.data.read_datasource( + RayDataLoader(file_paths, decorated_dataloader_callable), ray_remote_args={"num_cpus": num_cpus} + ) + else: + data = [] + for file in tqdm(file_paths, total=len(file_paths)): + content, error, elapse_time = decorated_dataloader_callable(file) + item = {"data": content, "filename": file, "error": error, "read_time": f"{elapse_time} secs"} + data.append(item) + return ray.data.from_items(data) + + +def ray_runner_initialization(func, debug=False): + @timeout(600) + def ray_runner(data): + content = data["data"] + if content is None: + return { + "filename": data["filename"], + "content": content, + "status": "failed", + "ret": -1, + "error": data["error"], + "read_time": data["read_time"], + "elaspe_time": "0.0 secs", + } + + decorated_callable = get_failable_with_time(func) + ret, error, elapse_time = decorated_callable(content) + status = "success" if not error else "failed" + if not debug: + content = None + return { + "filename": data["filename"], + "content": content, + "status": status, + "ret": ret, + "error": error, + "read_time": data["read_time"], + "elaspe_time": f"{elapse_time} secs", + } + + return ray_runner + + +def ray_execute(ds, log_name): + with Timer(f"execute with Ray, status log: {log_name}"): + ret_with_status = ds.take_all() + df = save_logs(log_name, ret_with_status) + ret = df["ret"].to_list() + return ret diff --git a/comps/guardrails/pii_detection/requirements.txt b/comps/guardrails/pii_detection/requirements.txt new file mode 100644 index 000000000..88690093f --- /dev/null +++ b/comps/guardrails/pii_detection/requirements.txt @@ -0,0 +1,26 @@ +beautifulsoup4 +detect_secrets +docarray[full] +easyocr +fastapi +gibberish-detector +huggingface_hub +langchain +langchain-community +langsmith +numpy +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +pandas +phonenumbers +Pillow +prometheus-fastapi-instrumentator +pyarrow +pymupdf +python-docx +ray +redis +sentence_transformers +shortuuid +virtualenv diff --git a/comps/guardrails/pii_detection/schema.yml b/comps/guardrails/pii_detection/schema.yml new file mode 100644 index 000000000..0c0ca9711 --- /dev/null +++ b/comps/guardrails/pii_detection/schema.yml @@ -0,0 +1,14 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +text: + - name: content + - name: source +numeric: + - name: start_index +vector: + - name: content_vector + algorithm: HNSW + datatype: FLOAT32 + dims: 384 + distance_metric: COSINE diff --git a/comps/guardrails/pii_detection/test.py b/comps/guardrails/pii_detection/test.py new file mode 100644 index 000000000..214c0d0b9 --- /dev/null +++ b/comps/guardrails/pii_detection/test.py @@ -0,0 +1,102 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import json +import os + +import requests +from utils import Timer + + +def test_html(ip_addr="localhost", batch_size=20): + import pandas as pd + + proxies = {"http": ""} + url = f"http://{ip_addr}:6357/v1/piidetect" + urls = pd.read_csv("data/ai_rss.csv")["Permalink"] + urls = urls[:batch_size].to_list() + payload = {"link_list": json.dumps(urls)} + + with Timer(f"send {len(urls)} link to pii detection endpoint"): + try: + resp = requests.post(url=url, data=payload, proxies=proxies) + print(resp.text) + resp.raise_for_status() # Raise an exception for unsuccessful HTTP status codes + print("Request successful!") + except requests.exceptions.RequestException as e: + print("An error occurred:", e) + + +def test_text(ip_addr="localhost", batch_size=20): + proxies = {"http": ""} + url = f"http://{ip_addr}:6357/v1/piidetect" + if os.path.exists("data/ai_rss.csv"): + import pandas as pd + + content = pd.read_csv("data/ai_rss.csv")["Description"] + content = content[:batch_size].to_list() + else: + content = ( + [ + """With new architectures, there comes a bit of a dilemma. After having spent billions of dollars training models with older architectures, companies rightfully wonder if it is worth spending billions more on a newer architecture that may itself be outmoded soon. +One possible solution to this dilemma is transfer learning. The idea here is to put noise into the trained model and then use the output given to then backpropagate on the new model. The idea here is that you don’t need to worry about generating huge amounts of novel data and potentially the number of epochs you have to train for is also significantly reduced. This idea has not been perfected yet, so it remains to be seen the role it will play in the future. +Nevertheless, as businesses become more invested in these architectures the potential for newer architectures that improve cost will only increase. Time will tell how quickly the industry moves to adopt them. +For those who are building apps that allow for a seamless transition between models, you can look at the major strives made in throughput and latency by YOCO and have hope that the major bottlenecks your app is having may soon be resolved. +It’s an exciting time to be building. +With special thanks to Christopher Taylor for his feedback on this blog post. +[1] Sun, Y., et al. “You Only Cache Once: Decoder-Decoder Architectures for Language Models” (2024), arXiv +[2] Sun, Y., et al. “Retentive Network: A Successor to Transformer for Large Language Models” (2023), arXiv +[3] Wikimedia Foundation, et al. “Hadamard product (matrices)” (2024), Wikipedia +[4] Sanderson, G. et al., “Attention in transformers, visually explained | Chapter 6, Deep Learning” (2024), YouTube +[5] A. Vaswani, et al., “Attention Is All You Need” (2017), arXiv +Understanding You Only Cache Once was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.""" + ] + * batch_size + ) + payload = {"text_list": json.dumps(content)} + + with Timer(f"send {len(content)} text to pii detection endpoint"): + try: + resp = requests.post(url=url, data=payload, proxies=proxies) + print(resp.text) + resp.raise_for_status() # Raise an exception for unsuccessful HTTP status codes + print("Request successful!") + except requests.exceptions.RequestException as e: + print("An error occurred:", e) + + +def test_pdf(ip_addr="localhost", batch_size=20): + proxies = {"http": ""} + url = f"http://{ip_addr}:6357/v1/piidetect" + dir_path = "data/pdf" + file_list = os.listdir(dir_path) + file_list = file_list[:batch_size] + files = [("files", (f, open(os.path.join(dir_path, f), "rb"), "application/pdf")) for f in file_list] + with Timer(f"send {len(files)} documents to pii detection endpoint"): + try: + resp = requests.request("POST", url=url, headers={}, files=files, proxies=proxies) + print(resp.text) + resp.raise_for_status() # Raise an exception for unsuccessful HTTP status codes + print("Request successful!") + except requests.exceptions.RequestException as e: + print("An error occurred:", e) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--test_html", action="store_true", help="Test HTML pii detection") + parser.add_argument("--test_pdf", action="store_true", help="Test PDF pii detection") + parser.add_argument("--test_text", action="store_true", help="Test Text pii detection") + parser.add_argument("--batch_size", type=int, default=20, help="Batch size for testing") + parser.add_argument("--ip_addr", type=str, default="localhost", help="IP address of the server") + + args = parser.parse_args() + if args.test_html: + test_html(ip_addr=args.ip_addr, batch_size=args.batch_size) + elif args.test_pdf: + test_pdf(ip_addr=args.ip_addr, batch_size=args.batch_size) + elif args.test_text: + test_text(ip_addr=args.ip_addr, batch_size=args.batch_size) + else: + print("Please specify the test type") diff --git a/comps/guardrails/pii_detection/utils.py b/comps/guardrails/pii_detection/utils.py new file mode 100644 index 000000000..0766bec70 --- /dev/null +++ b/comps/guardrails/pii_detection/utils.py @@ -0,0 +1,124 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import errno +import functools +import hashlib +import os +import signal +import timeit +from pathlib import Path + + +class Timer: + level = 0 + viewer = None + + def __init__(self, name): + self.name = name + if Timer.viewer: + Timer.viewer.display(f"{name} started ...") + else: + print(f"{name} started ...") + + def __enter__(self): + self.start = timeit.default_timer() + Timer.level += 1 + + def __exit__(self, *a, **kw): + Timer.level -= 1 + if Timer.viewer: + Timer.viewer.display(f'{" " * Timer.level}{self.name} took {timeit.default_timer() - self.start} sec') + else: + print(f'{" " * Timer.level}{self.name} took {timeit.default_timer() - self.start} sec') + + +class TimeoutError(Exception): + pass + + +def save_logs(log_name, data): + import pandas as pd + + df = pd.DataFrame.from_records(data) + try: + dir_path = os.path.dirname(log_name) + if not os.path.exists(dir_path): + os.makedirs(dir_path, exist_ok=True) + df.to_csv(log_name) + except: + pass + return df + + +def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): + def decorator(func): + def _handle_timeout(signum, frame): + raise TimeoutError(error_message) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = func(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return wrapper + + return decorator + + +def generate_log_name(file_list): + file_set = f"{sorted(file_list)}" + # print(f"file_set: {file_set}") + md5_str = hashlib.md5(file_set.encode()).hexdigest() + return f"status/status_{md5_str}.log" + + +def get_failable_with_time(callable): + def failable_callable(*args, **kwargs): + start_time = timeit.default_timer() + try: + content = callable(*args, **kwargs) + error = None + except Exception as e: + content = None + error = str(e) + end_time = timeit.default_timer() + return content, error, f"{'%.3f' % (end_time - start_time)}" + + return failable_callable + + +def prepare_env(enable_ray=False, pip_requirements=None, comps_path=None): + if enable_ray: + import ray + + if ray.is_initialized(): + ray.shutdown() + if pip_requirements is not None: + ray.init(runtime_env={"pip": pip_requirements, "env_vars": {"PYTHONPATH": comps_path}}) + else: + ray.init(runtime_env={"env_vars": {"PYTHONPATH": comps_path}}) + + +def get_max_cpus(total_num_tasks): + num_cpus_available = os.cpu_count() + num_cpus_per_task = num_cpus_available // total_num_tasks + if num_cpus_per_task == 0: + return 8 + return num_cpus_per_task + + +async def save_file_to_local_disk(save_path: str, file): + save_path = Path(save_path) + with save_path.open("wb") as fout: + try: + content = await file.read() + fout.write(content) + except Exception as e: + print(f"Write file failed. Exception: {e}") + raise SystemError(f"Write file {save_path} failed. Exception: {e}") diff --git a/tests/test_guardrails_pii_detection.sh b/tests/test_guardrails_pii_detection.sh new file mode 100644 index 000000000..4466992b5 --- /dev/null +++ b/tests/test_guardrails_pii_detection.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -xe + +WORKPATH=$(dirname "$PWD") +ip_address=$(hostname -I | awk '{print $1}') + +function build_docker_images() { + echo "Start building docker images for microservice" + cd $WORKPATH + docker build -t opea/guardrails-pii-detection:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/guardrails/pii_detection/docker/Dockerfile . + echo "Docker images built" +} + +function start_service() { + echo "Starting microservice" + docker run -d --runtime=runc --name="test-guardrails-pii-detection-endpoint" -p 6357:6357 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy opea/guardrails-pii-detection:latest + sleep 5 + echo "Microservice started" +} + +function validate_microservice() { + echo "Validate microservice started" + export PATH="${HOME}/miniforge3/bin:$PATH" + source activate + echo "test 1 - single task" + python comps/guardrails/pii_detection/test.py --test_text --batch_size 1 --ip_addr $ip_address + echo "test 2 - 20 tasks in parallel" + python comps/guardrails/pii_detection/test.py --test_text --batch_size 20 --ip_addr $ip_address + echo "Validate microservice completed" +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=test-guardrails-pii-detection-endpoint") + echo "Shutdown legacy containers "$cid + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo "cleanup container images and volumes" + echo y | docker system prune 2>&1 > /dev/null + +} + +main