|
|
""" |
|
|
PHI Annotation System for PDF documents. |
|
|
This module provides tools to detect, annotate, and track PHI in medical PDFs. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple, Any, Optional |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime |
|
|
import numpy as np |
|
|
from PIL import Image, ImageDraw |
|
|
import pdf2image |
|
|
import cv2 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PHIAnnotation: |
|
|
"""Represents a single PHI annotation in a document.""" |
|
|
category: str |
|
|
value: str |
|
|
page: int |
|
|
bbox: Optional[Tuple[int, int, int, int]] = None |
|
|
confidence: float = 1.0 |
|
|
masked_value: Optional[str] = None |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DocumentAnnotations: |
|
|
"""Contains all PHI annotations for a document.""" |
|
|
document_path: str |
|
|
annotations: List[PHIAnnotation] |
|
|
total_pages: int |
|
|
timestamp: str |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
|
|
|
class PHIAnnotator: |
|
|
"""Annotate PHI in medical documents.""" |
|
|
|
|
|
|
|
|
PHI_CATEGORIES = { |
|
|
'name': 'Names (patients, physicians, family)', |
|
|
'date': 'Dates (except year alone)', |
|
|
'address': 'Geographic subdivisions smaller than state', |
|
|
'phone': 'Phone and fax numbers', |
|
|
'email': 'Email addresses', |
|
|
'ssn': 'Social Security Numbers', |
|
|
'mrn': 'Medical Record Numbers', |
|
|
'insurance_id': 'Health plan beneficiary numbers', |
|
|
'account': 'Account numbers', |
|
|
'license': 'Certificate/license numbers', |
|
|
'vehicle': 'Vehicle identifiers and license plates', |
|
|
'device_id': 'Device identifiers and serial numbers', |
|
|
'url': 'Web URLs', |
|
|
'ip': 'IP addresses', |
|
|
'biometric': 'Biometric identifiers', |
|
|
'unique_id': 'Any unique identifying number', |
|
|
'geo_small': 'Geographic subdivisions < state', |
|
|
'institution': 'Healthcare facility names', |
|
|
} |
|
|
|
|
|
|
|
|
PHI_PATTERNS = { |
|
|
'ssn': r'\b\d{3}-\d{2}-\d{4}\b|\b\d{9}\b', |
|
|
'phone': r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', |
|
|
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', |
|
|
'mrn': r'\b(?:MRN|Medical Record Number)[:\s]?[\w\d-]+\b', |
|
|
'date': r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b', |
|
|
'url': r'https?://[^\s]+', |
|
|
'ip': r'\b(?:\d{1,3}\.){3}\d{1,3}\b', |
|
|
'insurance_id': r'\b(?:INS|Policy|Member ID)[:\s]?[\w\d-]+\b', |
|
|
'license': r'\b(?:License|DEA|NPI)[:\s]?[\w\d-]+\b', |
|
|
} |
|
|
|
|
|
def __init__(self, confidence_threshold: float = 0.85): |
|
|
""" |
|
|
Initialize PHI Annotator. |
|
|
|
|
|
Args: |
|
|
confidence_threshold: Minimum confidence for PHI detection |
|
|
""" |
|
|
self.confidence_threshold = confidence_threshold |
|
|
|
|
|
def annotate_pdf(self, pdf_path: Path, dpi: int = 150) -> DocumentAnnotations: |
|
|
""" |
|
|
Annotate PHI in a PDF document. |
|
|
|
|
|
Args: |
|
|
pdf_path: Path to PDF file |
|
|
dpi: DPI for PDF to image conversion |
|
|
|
|
|
Returns: |
|
|
DocumentAnnotations object |
|
|
""" |
|
|
|
|
|
images = pdf2image.convert_from_path(pdf_path, dpi=dpi) |
|
|
|
|
|
annotations = [] |
|
|
for page_num, image in enumerate(images, 1): |
|
|
|
|
|
img_array = np.array(image) |
|
|
|
|
|
|
|
|
text_regions = self._detect_text_regions(img_array) |
|
|
|
|
|
|
|
|
for region in text_regions: |
|
|
phi_results = self._analyze_region_for_phi(region, page_num) |
|
|
annotations.extend(phi_results) |
|
|
|
|
|
|
|
|
doc_annotations = DocumentAnnotations( |
|
|
document_path=str(pdf_path), |
|
|
annotations=annotations, |
|
|
total_pages=len(images), |
|
|
timestamp=datetime.now().isoformat(), |
|
|
metadata={ |
|
|
'dpi': dpi, |
|
|
'confidence_threshold': self.confidence_threshold, |
|
|
} |
|
|
) |
|
|
|
|
|
return doc_annotations |
|
|
|
|
|
def _detect_text_regions(self, image: np.ndarray) -> List[Dict]: |
|
|
""" |
|
|
Detect text regions in an image using computer vision. |
|
|
|
|
|
Args: |
|
|
image: Image array |
|
|
|
|
|
Returns: |
|
|
List of text regions with bounding boxes |
|
|
""" |
|
|
|
|
|
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) |
|
|
|
|
|
|
|
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) |
|
|
|
|
|
|
|
|
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
|
|
regions = [] |
|
|
for contour in contours: |
|
|
x, y, w, h = cv2.boundingRect(contour) |
|
|
|
|
|
|
|
|
if w > 20 and h > 10: |
|
|
regions.append({ |
|
|
'bbox': (x, y, x + w, y + h), |
|
|
'area': w * h, |
|
|
}) |
|
|
|
|
|
return regions |
|
|
|
|
|
def _analyze_region_for_phi(self, region: Dict, page_num: int) -> List[PHIAnnotation]: |
|
|
""" |
|
|
Analyze a text region for PHI. |
|
|
|
|
|
Args: |
|
|
region: Text region dictionary |
|
|
page_num: Page number |
|
|
|
|
|
Returns: |
|
|
List of PHI annotations found |
|
|
""" |
|
|
annotations = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
simulated_phi = self._simulate_phi_detection(region['bbox']) |
|
|
|
|
|
for phi_item in simulated_phi: |
|
|
annotation = PHIAnnotation( |
|
|
category=phi_item['category'], |
|
|
value=phi_item['value'], |
|
|
page=page_num, |
|
|
bbox=region['bbox'], |
|
|
confidence=phi_item['confidence'], |
|
|
) |
|
|
annotations.append(annotation) |
|
|
|
|
|
return annotations |
|
|
|
|
|
def _simulate_phi_detection(self, bbox: Tuple) -> List[Dict]: |
|
|
""" |
|
|
Simulate PHI detection for testing. |
|
|
In production, this would be replaced with actual OCR and pattern matching. |
|
|
""" |
|
|
import random |
|
|
|
|
|
|
|
|
if random.random() < 0.3: |
|
|
category = random.choice(list(self.PHI_CATEGORIES.keys())) |
|
|
return [{ |
|
|
'category': category, |
|
|
'value': f"SIMULATED_{category.upper()}_VALUE", |
|
|
'confidence': random.uniform(0.85, 1.0), |
|
|
}] |
|
|
return [] |
|
|
|
|
|
def apply_pattern_matching(self, text: str) -> List[Dict]: |
|
|
""" |
|
|
Apply regex patterns to detect PHI in text. |
|
|
|
|
|
Args: |
|
|
text: Text to analyze |
|
|
|
|
|
Returns: |
|
|
List of detected PHI items |
|
|
""" |
|
|
detections = [] |
|
|
|
|
|
for category, pattern in self.PHI_PATTERNS.items(): |
|
|
matches = re.finditer(pattern, text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
detections.append({ |
|
|
'category': category, |
|
|
'value': match.group(), |
|
|
'start': match.start(), |
|
|
'end': match.end(), |
|
|
'confidence': 0.9, |
|
|
}) |
|
|
|
|
|
return detections |
|
|
|
|
|
def create_masked_image( |
|
|
self, |
|
|
image: Image.Image, |
|
|
annotations: List[PHIAnnotation], |
|
|
mask_type: str = 'black_box' |
|
|
) -> Image.Image: |
|
|
""" |
|
|
Create a masked version of the image with PHI redacted. |
|
|
|
|
|
Args: |
|
|
image: Original image |
|
|
annotations: PHI annotations |
|
|
mask_type: Type of masking ('black_box', 'blur', 'pixelate') |
|
|
|
|
|
Returns: |
|
|
Masked image |
|
|
""" |
|
|
|
|
|
masked_image = image.copy() |
|
|
draw = ImageDraw.Draw(masked_image) |
|
|
|
|
|
for annotation in annotations: |
|
|
if annotation.bbox: |
|
|
x1, y1, x2, y2 = annotation.bbox |
|
|
|
|
|
if mask_type == 'black_box': |
|
|
|
|
|
draw.rectangle([x1, y1, x2, y2], fill='black') |
|
|
elif mask_type == 'blur': |
|
|
|
|
|
region = image.crop((x1, y1, x2, y2)) |
|
|
blurred = region.filter(ImageFilter.GaussianBlur(radius=10)) |
|
|
masked_image.paste(blurred, (x1, y1)) |
|
|
elif mask_type == 'pixelate': |
|
|
|
|
|
region = image.crop((x1, y1, x2, y2)) |
|
|
small = region.resize((10, 10), Image.NEAREST) |
|
|
pixelated = small.resize(region.size, Image.NEAREST) |
|
|
masked_image.paste(pixelated, (x1, y1)) |
|
|
|
|
|
return masked_image |
|
|
|
|
|
def save_annotations(self, annotations: DocumentAnnotations, output_path: Path): |
|
|
""" |
|
|
Save annotations to JSON file. |
|
|
|
|
|
Args: |
|
|
annotations: Document annotations |
|
|
output_path: Path to save JSON file |
|
|
""" |
|
|
|
|
|
data = { |
|
|
'document_path': annotations.document_path, |
|
|
'total_pages': annotations.total_pages, |
|
|
'timestamp': annotations.timestamp, |
|
|
'metadata': annotations.metadata, |
|
|
'annotations': [asdict(ann) for ann in annotations.annotations], |
|
|
} |
|
|
|
|
|
with open(output_path, 'w') as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
def load_annotations(self, json_path: Path) -> DocumentAnnotations: |
|
|
""" |
|
|
Load annotations from JSON file. |
|
|
|
|
|
Args: |
|
|
json_path: Path to JSON file |
|
|
|
|
|
Returns: |
|
|
DocumentAnnotations object |
|
|
""" |
|
|
with open(json_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
annotations = [ |
|
|
PHIAnnotation(**ann_dict) |
|
|
for ann_dict in data['annotations'] |
|
|
] |
|
|
|
|
|
return DocumentAnnotations( |
|
|
document_path=data['document_path'], |
|
|
annotations=annotations, |
|
|
total_pages=data['total_pages'], |
|
|
timestamp=data['timestamp'], |
|
|
metadata=data['metadata'], |
|
|
) |
|
|
|
|
|
def calculate_statistics(self, annotations: DocumentAnnotations) -> Dict: |
|
|
""" |
|
|
Calculate statistics about PHI in the document. |
|
|
|
|
|
Args: |
|
|
annotations: Document annotations |
|
|
|
|
|
Returns: |
|
|
Dictionary of statistics |
|
|
""" |
|
|
stats = { |
|
|
'total_phi_items': len(annotations.annotations), |
|
|
'pages_with_phi': len(set(ann.page for ann in annotations.annotations)), |
|
|
'phi_by_category': {}, |
|
|
'average_confidence': 0.0, |
|
|
} |
|
|
|
|
|
|
|
|
for ann in annotations.annotations: |
|
|
if ann.category not in stats['phi_by_category']: |
|
|
stats['phi_by_category'][ann.category] = 0 |
|
|
stats['phi_by_category'][ann.category] += 1 |
|
|
|
|
|
|
|
|
if annotations.annotations: |
|
|
confidences = [ann.confidence for ann in annotations.annotations] |
|
|
stats['average_confidence'] = sum(confidences) / len(confidences) |
|
|
|
|
|
return stats |
|
|
|
|
|
def create_annotation_report( |
|
|
self, |
|
|
annotations: DocumentAnnotations, |
|
|
output_path: Path |
|
|
): |
|
|
""" |
|
|
Create a human-readable report of PHI annotations. |
|
|
|
|
|
Args: |
|
|
annotations: Document annotations |
|
|
output_path: Path to save report |
|
|
""" |
|
|
stats = self.calculate_statistics(annotations) |
|
|
|
|
|
report = [] |
|
|
report.append("=" * 60) |
|
|
report.append("PHI ANNOTATION REPORT") |
|
|
report.append("=" * 60) |
|
|
report.append(f"Document: {annotations.document_path}") |
|
|
report.append(f"Timestamp: {annotations.timestamp}") |
|
|
report.append(f"Total Pages: {annotations.total_pages}") |
|
|
report.append("") |
|
|
report.append("STATISTICS") |
|
|
report.append("-" * 40) |
|
|
report.append(f"Total PHI Items: {stats['total_phi_items']}") |
|
|
report.append(f"Pages with PHI: {stats['pages_with_phi']}/{annotations.total_pages}") |
|
|
report.append(f"Average Confidence: {stats['average_confidence']:.2%}") |
|
|
report.append("") |
|
|
report.append("PHI BY CATEGORY") |
|
|
report.append("-" * 40) |
|
|
|
|
|
for category, count in sorted(stats['phi_by_category'].items()): |
|
|
description = self.PHI_CATEGORIES.get(category, 'Unknown') |
|
|
report.append(f"{category:15} {count:3} items - {description}") |
|
|
|
|
|
report.append("") |
|
|
report.append("DETAILED ANNOTATIONS") |
|
|
report.append("-" * 40) |
|
|
|
|
|
for i, ann in enumerate(annotations.annotations, 1): |
|
|
report.append(f"\n{i}. Category: {ann.category}") |
|
|
report.append(f" Value: {ann.value[:30]}..." if len(ann.value) > 30 else f" Value: {ann.value}") |
|
|
report.append(f" Page: {ann.page}") |
|
|
report.append(f" Confidence: {ann.confidence:.2%}") |
|
|
if ann.bbox: |
|
|
report.append(f" Location: {ann.bbox}") |
|
|
|
|
|
report.append("") |
|
|
report.append("=" * 60) |
|
|
report.append("END OF REPORT") |
|
|
report.append("=" * 60) |
|
|
|
|
|
|
|
|
with open(output_path, 'w') as f: |
|
|
f.write('\n'.join(report)) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Example usage of PHI Annotator.""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description='Annotate PHI in PDF documents') |
|
|
parser.add_argument('--pdf', type=str, required=True, help='Path to PDF file') |
|
|
parser.add_argument('--output', type=str, help='Output directory for annotations') |
|
|
parser.add_argument('--report', action='store_true', help='Generate annotation report') |
|
|
parser.add_argument('--mask', action='store_true', help='Create masked version of PDF') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
annotator = PHIAnnotator() |
|
|
|
|
|
|
|
|
pdf_path = Path(args.pdf) |
|
|
print(f"Annotating PHI in {pdf_path}...") |
|
|
annotations = annotator.annotate_pdf(pdf_path) |
|
|
|
|
|
|
|
|
output_dir = Path(args.output) if args.output else pdf_path.parent |
|
|
output_dir.mkdir(exist_ok=True) |
|
|
|
|
|
json_path = output_dir / f"{pdf_path.stem}_annotations.json" |
|
|
annotator.save_annotations(annotations, json_path) |
|
|
print(f"Annotations saved to {json_path}") |
|
|
|
|
|
|
|
|
if args.report: |
|
|
report_path = output_dir / f"{pdf_path.stem}_report.txt" |
|
|
annotator.create_annotation_report(annotations, report_path) |
|
|
print(f"Report saved to {report_path}") |
|
|
|
|
|
|
|
|
stats = annotator.calculate_statistics(annotations) |
|
|
print(f"\nPHI Statistics:") |
|
|
print(f" Total PHI items: {stats['total_phi_items']}") |
|
|
print(f" Pages with PHI: {stats['pages_with_phi']}") |
|
|
print(f" Categories found: {', '.join(stats['phi_by_category'].keys())}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |