← back to invincible-jha__aumai-pii-redactor

Function bodies 18 total

All specs Real LLM only Function bodies
_load_config function · python · L23-L39 (17 LOC)
src/aumai_pii_redactor/cli.py
def _load_config(config_path: str) -> RedactionConfig:
    """Load a :class:`RedactionConfig` from a YAML or JSON file."""
    file_path = Path(config_path)
    raw = file_path.read_text(encoding="utf-8")
    if file_path.suffix in (".yaml", ".yml"):
        try:
            import yaml  # type: ignore[import-untyped]
            data: dict[str, Any] = yaml.safe_load(raw)
        except ImportError:
            click.echo(
                "PyYAML required for YAML config. Install: pip install pyyaml",
                err=True,
            )
            sys.exit(1)
    else:
        data = json.loads(raw)
    return RedactionConfig(**data)
_default_config function · python · L42-L47 (6 LOC)
src/aumai_pii_redactor/cli.py
def _default_config() -> RedactionConfig:
    return RedactionConfig(
        rules=[],
        default_strategy=RedactionStrategy.mask,
        custom_patterns={},
    )
scan_command function · python · L72-L97 (26 LOC)
src/aumai_pii_redactor/cli.py
def scan_command(input_path: str, config_path: str | None, json_output: bool) -> None:
    """Scan a text file for PII and report all matches."""
    config = _load_config(config_path) if config_path else _default_config()
    detector = PIIDetector(config)

    text = Path(input_path).read_text(encoding="utf-8")
    matches = detector.detect(text)

    if json_output:
        output = [m.model_dump(mode="json") for m in matches]
        click.echo(json.dumps(output, indent=2))
        return

    if not matches:
        click.echo("No PII detected.")
        return

    click.echo(f"Found {len(matches)} PII match(es):\n")
    for match in matches:
        snippet = match.original_text[:40]
        click.echo(
            f"  [{match.pii_type.value}]  "
            f"pos={match.start}-{match.end}  "
            f"confidence={match.confidence:.2f}  "
            f'"{snippet}"'
        )
redact_command function · python · L120-L141 (22 LOC)
src/aumai_pii_redactor/cli.py
def redact_command(
    input_path: str,
    output_path: str,
    config_path: str | None,
    strategy: str,
) -> None:
    """Redact PII from a text file and write the result to a new file."""
    if config_path:
        config = _load_config(config_path)
    else:
        config = RedactionConfig(
            default_strategy=RedactionStrategy(strategy),
        )

    redactor = PIIRedactor(config)
    text = Path(input_path).read_text(encoding="utf-8")
    result = redactor.redact(text)

    Path(output_path).write_text(result.redacted_text, encoding="utf-8")

    click.echo(f"Redacted {result.redactions_applied} PII instance(s).")
    click.echo(f"Output written to: {output_path}")
configure_command function · python · L152-L208 (57 LOC)
src/aumai_pii_redactor/cli.py
def configure_command(output: str) -> None:
    """Generate a default redaction config file."""
    out_path = Path(output)

    if out_path.suffix in (".yaml", ".yml"):
        try:
            import yaml  # type: ignore[import-untyped]
            config_dict: dict[str, Any] = {
                "default_strategy": "mask",
                "rules": [
                    {"pii_type": "email", "strategy": "mask"},
                    {"pii_type": "phone", "strategy": "mask"},
                    {
                        "pii_type": "ssn",
                        "strategy": "replace",
                        "replacement": "[SSN REDACTED]",
                    },
                    {
                        "pii_type": "credit_card",
                        "strategy": "replace",
                        "replacement": "[CARD REDACTED]",
                    },
                    {"pii_type": "ip_address", "strategy": "hash"},
                ],
                "custom_patterns": {},
 
_luhn_valid function · python · L100-L111 (12 LOC)
src/aumai_pii_redactor/detector.py
def _luhn_valid(number_str: str) -> bool:
    digits = [int(c) for c in number_str if c.isdigit()]
    if len(digits) < 13:
        return False
    total = 0
    for i, digit in enumerate(reversed(digits)):
        if i % 2 == 1:
            digit *= 2
            if digit > 9:
                digit -= 9
        total += digit
    return total % 10 == 0
PIIDetector.__init__ method · python · L121-L126 (6 LOC)
src/aumai_pii_redactor/detector.py
    def __init__(self, config: RedactionConfig) -> None:
        self._config = config
        self._custom_patterns: list[tuple[PIIType, re.Pattern[str], float]] = []
        for _label, raw_pattern in config.custom_patterns.items():
            compiled = re.compile(raw_pattern)
            self._custom_patterns.append((PIIType.custom, compiled, 0.80))
Repobility · open methodology · https://repobility.com/research/
PIIDetector.detect method · python · L128-L163 (36 LOC)
src/aumai_pii_redactor/detector.py
    def detect(self, text: str) -> list[PIIMatch]:
        """Return all PII matches found in *text*, deduplicated by span."""
        matches: list[PIIMatch] = []
        seen_spans: set[tuple[int, int]] = set()

        for pii_type, pattern, base_confidence in (
            _BUILTIN_PATTERNS + self._custom_patterns
        ):
            for match in pattern.finditer(text):
                span = (match.start(), match.end())
                if span in seen_spans:
                    continue

                original = match.group()
                confidence = base_confidence

                # Boost credit card confidence with Luhn check
                if pii_type == PIIType.credit_card:
                    if _luhn_valid(original):
                        confidence = min(1.0, confidence + 0.08)
                    else:
                        confidence = max(0.0, confidence - 0.30)

                seen_spans.add(span)
                matches.append(
                    PIIMa
PIIDetector.detect_in_dict method · python · L165-L178 (14 LOC)
src/aumai_pii_redactor/detector.py
    def detect_in_dict(self, data: dict[str, object]) -> dict[str, list[PIIMatch]]:
        """Recursively traverse *data* and detect PII in all string values.

        Returns:
            A mapping from dot-joined key path to the list of matches found in
            that value.  Only paths with at least one match are included.
        """
        results: dict[str, list[PIIMatch]] = {}
        for path, value in _flatten_dict(data):
            if isinstance(value, str):
                matches = self.detect(value)
                if matches:
                    results[path] = matches
        return results
_flatten_dict function · python · L185-L199 (15 LOC)
src/aumai_pii_redactor/detector.py
def _flatten_dict(
    data: object,
    prefix: str = "",
) -> Iterator[tuple[str, object]]:
    """Yield (dotted_key, value) pairs for all leaf values in a nested dict."""
    if isinstance(data, dict):
        for key, val in data.items():
            full_key = f"{prefix}.{key}" if prefix else str(key)
            yield from _flatten_dict(val, full_key)
    elif isinstance(data, list):
        for idx, item in enumerate(data):
            full_key = f"{prefix}[{idx}]"
            yield from _flatten_dict(item, full_key)
    else:
        yield prefix, data
PIIRedactingSpanProcessor.on_start method · python · L40-L45 (6 LOC)
src/aumai_pii_redactor/otel_processor.py
    def on_start(
        self,
        span: Span,
        parent_context: Context | None = None,
    ) -> None:
        """No-op: PII is redacted on span end, not on start."""
PIIRedactingSpanProcessor.on_end method · python · L47-L74 (28 LOC)
src/aumai_pii_redactor/otel_processor.py
    def on_end(self, span: ReadableSpan) -> None:
        """Redact PII from all string attributes on the finished span.

        The span's internal attributes dict is mutated in-place.  This is
        intentional — we want the redacted values to flow to any downstream
        exporters that were added after this processor.
        """
        if span.attributes is None:
            return

        # span.attributes is a BoundedAttributes mapping; we can mutate it
        # via the underlying dict by accessing _attributes (SDK internal).
        # We build a replacement dict to avoid mutation during iteration.
        redacted_attrs: dict[str, object] = {}
        for key, value in span.attributes.items():
            if isinstance(value, str):
                redacted_attrs[key] = self._redactor.redact(value).redacted_text
            else:
                redacted_attrs[key] = value

        # Overwrite values in the internal dict if accessible, otherwise no-op.
        try:
      
_apply_strategy function · python · L41-L53 (13 LOC)
src/aumai_pii_redactor/redactor.py
def _apply_strategy(
    original: str,
    strategy: RedactionStrategy,
    replacement: str | None,
) -> str:
    if strategy == RedactionStrategy.mask:
        return _apply_mask(original)
    if strategy == RedactionStrategy.hash:
        return _apply_hash(original)
    if strategy == RedactionStrategy.remove:
        return _apply_remove(original)
    # replace
    return _apply_replace(original, replacement)
PIIRedactor.__init__ method · python · L63-L69 (7 LOC)
src/aumai_pii_redactor/redactor.py
    def __init__(self, config: RedactionConfig) -> None:
        self._config = config
        self._detector = PIIDetector(config)
        # Build a quick lookup from PIIType -> RedactionRule
        self._rule_map: dict[PIIType, RedactionRule] = {
            rule.pii_type: rule for rule in config.rules
        }
PIIRedactor.redact method · python · L75-L104 (30 LOC)
src/aumai_pii_redactor/redactor.py
    def redact(self, text: str) -> RedactionResult:
        """Detect and redact all PII in *text*.

        Overlapping spans are handled by processing matches in reverse order so
        character positions remain valid throughout the replacement loop.
        """
        matches = self._detector.detect(text)
        if not matches:
            return RedactionResult(
                original_length=len(text),
                redacted_text=text,
                matches_found=[],
                redactions_applied=0,
            )

        # Process in reverse to preserve forward indices
        redacted = text
        applied = 0
        for match in reversed(matches):
            strategy, replacement = self._resolve_strategy(match.pii_type)
            substitution = _apply_strategy(match.original_text, strategy, replacement)
            redacted = redacted[: match.start] + substitution + redacted[match.end :]
            applied += 1

        return RedactionResult(
            or
Powered by Repobility — scan your code at https://repobility.com
PIIRedactor.redact_dict method · python · L106-L112 (7 LOC)
src/aumai_pii_redactor/redactor.py
    def redact_dict(self, data: dict[str, object]) -> dict[str, object]:
        """Recursively redact all string values in *data*.

        Non-string leaf values are left unchanged.  The input dict is not
        mutated; a deep copy with redacted strings is returned.
        """
        return self._redact_value(data)  # type: ignore[return-value]
PIIRedactor._resolve_strategy method · python · L118-L124 (7 LOC)
src/aumai_pii_redactor/redactor.py
    def _resolve_strategy(
        self, pii_type: PIIType
    ) -> tuple[RedactionStrategy, str | None]:
        rule = self._rule_map.get(pii_type)
        if rule:
            return rule.strategy, rule.replacement
        return self._config.default_strategy, None
PIIRedactor._redact_value method · python · L126-L133 (8 LOC)
src/aumai_pii_redactor/redactor.py
    def _redact_value(self, value: object) -> object:
        if isinstance(value, str):
            return self.redact(value).redacted_text
        if isinstance(value, dict):
            return {k: self._redact_value(v) for k, v in value.items()}
        if isinstance(value, list):
            return [self._redact_value(item) for item in value]
        return value