diff --git a/.flake8 b/.flake8 index e2ed01d9..5f3e84f1 100644 --- a/.flake8 +++ b/.flake8 @@ -3,4 +3,4 @@ ignore = E203, E266, E501, W503, B006, B007, B008, F401, C416, B950, B904 max-line-length = 88 max-complexity = 18 select = B,C,E,F,W,T4,B9 -exclude = venv, .venv, tests/.datafog_env, examples/venv \ No newline at end of file +exclude = venv, .venv, tests/.datafog_env, examples/venv, script.py \ No newline at end of file diff --git a/.gitignore b/.gitignore index e95d26b6..35a65cde 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,5 @@ error_log.txt docs/* !docs/*.rst !docs/conf.py -scratch.py \ No newline at end of file +scratch.py +script.py \ No newline at end of file diff --git a/datafog/__init__.py b/datafog/__init__.py index 7838dd31..6bea684e 100644 --- a/datafog/__init__.py +++ b/datafog/__init__.py @@ -1,7 +1,7 @@ from .__about__ import __version__ from .client import app from .config import OperationType, get_config -from .main import DataFog, TextPIIAnnotator +from .main import DataFog from .models.annotator import ( AnalysisExplanation, AnnotationResult, @@ -30,7 +30,6 @@ "ImageService", "OperationType", "SparkService", - "TextPIIAnnotator", "TextService", "SpacyPIIAnnotator", "ImageDownloader", diff --git a/datafog/main.py b/datafog/main.py index 58224e59..fec2a8c8 100644 --- a/datafog/main.py +++ b/datafog/main.py @@ -15,7 +15,7 @@ from .config import OperationType from .models.anonymizer import Anonymizer, AnonymizerType, HashType -from .processing.text_processing.spacy_pii_annotator import SpacyPIIAnnotator +from .models.spacy_nlp import SpacyAnnotator from .services.image_service import ImageService from .services.spark_service import SparkService from .services.text_service import TextService @@ -36,6 +36,7 @@ class DataFog: spark_service: Optional Spark service for distributed processing. operations: List of operations to perform. anonymizer: Anonymizer for PII redaction, replacement, or hashing. + annotator: SpacyAnnotator instance for text annotation. """ def __init__( @@ -54,6 +55,7 @@ def __init__( self.anonymizer = Anonymizer( hash_type=hash_type, anonymizer_type=anonymizer_type ) + self.annotator = SpacyAnnotator() self.logger = logging.getLogger(__name__) self.logger.info( "Initializing DataFog class with the following services and operations:" @@ -120,63 +122,51 @@ async def _process_text(self, text_list: List[str]): """ Internal method to process text based on enabled operations. """ - if OperationType.SCAN in self.operations: - annotated_text = await self.text_service.batch_annotate_text_async( - text_list - ) - self.logger.info( - f"Text annotation completed with {len(annotated_text)} annotations." - ) - - if OperationType.REDACT in self.operations: - return [ - self.anonymizer.anonymize( - text, annotations, AnonymizerType.REDACT - ).anonymized_text - for text, annotations in zip(text_list, annotated_text, strict=True) - ] - elif OperationType.REPLACE in self.operations: - return [ - self.anonymizer.anonymize( - text, annotations, AnonymizerType.REPLACE - ).anonymized_text - for text, annotations in zip(text_list, annotated_text, strict=True) - ] - elif OperationType.HASH in self.operations: - return [ - self.anonymizer.anonymize( - text, annotations, AnonymizerType.HASH - ).anonymized_text - for text, annotations in zip(text_list, annotated_text, strict=True) + try: + if OperationType.SCAN in self.operations: + annotated_text = [ + self.annotator.annotate_text(text) for text in text_list ] - else: + + if OperationType.REDACT in self.operations: + self.anonymizer.anonymizer_type = AnonymizerType.REDACT + elif OperationType.REPLACE in self.operations: + self.anonymizer.anonymizer_type = AnonymizerType.REPLACE + elif OperationType.HASH in self.operations: + self.anonymizer.anonymizer_type = AnonymizerType.HASH + + if any( + op in self.operations + for op in [ + OperationType.REDACT, + OperationType.REPLACE, + OperationType.HASH, + ] + ): + return [ + self.anonymizer.anonymize(text, annotations).anonymized_text + for text, annotations in zip( + text_list, annotated_text, strict=True + ) + ] return annotated_text - self.logger.info( - "No annotation or anonymization operation found; returning original texts." - ) - return text_list + return text_list + except Exception as e: + self.logger.error(f"Error in _process_text: {str(e)}") + raise def run_text_pipeline_sync(self, str_list: List[str]) -> List[str]: """ Run the text pipeline synchronously on a list of input text. - - Args: - str_list (List[str]): A list of text strings to be processed. - - Returns: - List[str]: Processed text results based on the enabled operations. - - Raises: - Exception: Any error encountered during the text processing. """ try: - self.logger.info(f"Starting text pipeline with {len(str_list)} texts.") + self.logger.info(f"Starting text pipeline with {len(str_list)} texts") + if OperationType.SCAN in self.operations: - annotated_text = self.text_service.batch_annotate_text_sync(str_list) - self.logger.info( - f"Text annotation completed with {len(annotated_text)} annotations." - ) + annotated_text = [ + self.annotator.annotate_text(text) for text in str_list + ] if any( op in self.operations @@ -192,12 +182,8 @@ def run_text_pipeline_sync(self, str_list: List[str]) -> List[str]: str_list, annotated_text, strict=True ) ] - else: - return annotated_text + return annotated_text - self.logger.info( - "No annotation or anonymization operation found; returning original texts." - ) return str_list except Exception as e: self.logger.error(f"Error in run_text_pipeline_sync: {str(e)}") @@ -222,35 +208,3 @@ def _add_attributes(self, attributes: dict): """ for key, value in attributes.items(): setattr(self, key, value) - - -class TextPIIAnnotator: - """ - Class for annotating PII in text. - - Provides functionality to detect and annotate Personally Identifiable Information (PII) in text. - - Attributes: - text_annotator: SpacyPIIAnnotator instance for text annotation. - spark_processor: Optional SparkService for distributed processing. - """ - - def __init__(self): - self.text_annotator = SpacyPIIAnnotator.create() - self.spark_processor: SparkService = None - - def run(self, text, output_path=None): - try: - annotated_text = self.text_annotator.annotate(text) - - # Optionally, output the results to a JSON file - if output_path: - with open(output_path, "w") as f: - json.dump(annotated_text, f) - - return annotated_text - - finally: - # Ensure Spark resources are released - if self.spark_processor: - self.spark_processor.stop() diff --git a/script.py b/script.py new file mode 100644 index 00000000..b130cbd3 --- /dev/null +++ b/script.py @@ -0,0 +1,38 @@ +from datafog import DataFog +from datafog.config import OperationType +from datafog.models.anonymizer import Anonymizer, AnonymizerType +from datafog.models.spacy_nlp import SpacyAnnotator + +client = DataFog(operations=[OperationType.SCAN, OperationType.REDACT]) + +text = "Tim Cook is the CEO of Apple and is based out of Cupertino, California" + +# README Implementation +redacted_text = client.run_text_pipeline_sync([text])[0] +print(redacted_text) + +# Correct Implementation +# annotator = SpacyAnnotator() +# anonymizer = Anonymizer(anonymizer_type=AnonymizerType.REDACT) +# annotations = annotator.annotate_text(text) +# result = anonymizer.anonymize(text, annotations) +# print(result.anonymized_text) + + +# Sample redaction using DataFog main.py implementation +sample_texts = [ + "John Smith lives at 123 Main St in New York", + "Contact Sarah Jones at sarah.jones@email.com or (555) 123-4567", + "SSN: 123-45-6789 belongs to Michael Wilson", +] + +# Initialize DataFog with SCAN and REDACT operations +datafog_client = DataFog(operations=[OperationType.SCAN, OperationType.REDACT]) + +# Process multiple texts synchronously +redacted_results = datafog_client.run_text_pipeline_sync(sample_texts) + +print("Original vs Redacted texts:") +for original, redacted in zip(sample_texts, redacted_results): + print("\nOriginal:", original) + print("Redacted:", redacted)