Function bodies 18 total
_load_config function · python · L23-L39 (17 LOC)src/aumai_pii_redactor/cli.py
def _load_config(config_path: str) -> RedactionConfig:
"""Load a :class:`RedactionConfig` from a YAML or JSON file."""
file_path = Path(config_path)
raw = file_path.read_text(encoding="utf-8")
if file_path.suffix in (".yaml", ".yml"):
try:
import yaml # type: ignore[import-untyped]
data: dict[str, Any] = yaml.safe_load(raw)
except ImportError:
click.echo(
"PyYAML required for YAML config. Install: pip install pyyaml",
err=True,
)
sys.exit(1)
else:
data = json.loads(raw)
return RedactionConfig(**data)_default_config function · python · L42-L47 (6 LOC)src/aumai_pii_redactor/cli.py
def _default_config() -> RedactionConfig:
return RedactionConfig(
rules=[],
default_strategy=RedactionStrategy.mask,
custom_patterns={},
)scan_command function · python · L72-L97 (26 LOC)src/aumai_pii_redactor/cli.py
def scan_command(input_path: str, config_path: str | None, json_output: bool) -> None:
"""Scan a text file for PII and report all matches."""
config = _load_config(config_path) if config_path else _default_config()
detector = PIIDetector(config)
text = Path(input_path).read_text(encoding="utf-8")
matches = detector.detect(text)
if json_output:
output = [m.model_dump(mode="json") for m in matches]
click.echo(json.dumps(output, indent=2))
return
if not matches:
click.echo("No PII detected.")
return
click.echo(f"Found {len(matches)} PII match(es):\n")
for match in matches:
snippet = match.original_text[:40]
click.echo(
f" [{match.pii_type.value}] "
f"pos={match.start}-{match.end} "
f"confidence={match.confidence:.2f} "
f'"{snippet}"'
)redact_command function · python · L120-L141 (22 LOC)src/aumai_pii_redactor/cli.py
def redact_command(
input_path: str,
output_path: str,
config_path: str | None,
strategy: str,
) -> None:
"""Redact PII from a text file and write the result to a new file."""
if config_path:
config = _load_config(config_path)
else:
config = RedactionConfig(
default_strategy=RedactionStrategy(strategy),
)
redactor = PIIRedactor(config)
text = Path(input_path).read_text(encoding="utf-8")
result = redactor.redact(text)
Path(output_path).write_text(result.redacted_text, encoding="utf-8")
click.echo(f"Redacted {result.redactions_applied} PII instance(s).")
click.echo(f"Output written to: {output_path}")configure_command function · python · L152-L208 (57 LOC)src/aumai_pii_redactor/cli.py
def configure_command(output: str) -> None:
"""Generate a default redaction config file."""
out_path = Path(output)
if out_path.suffix in (".yaml", ".yml"):
try:
import yaml # type: ignore[import-untyped]
config_dict: dict[str, Any] = {
"default_strategy": "mask",
"rules": [
{"pii_type": "email", "strategy": "mask"},
{"pii_type": "phone", "strategy": "mask"},
{
"pii_type": "ssn",
"strategy": "replace",
"replacement": "[SSN REDACTED]",
},
{
"pii_type": "credit_card",
"strategy": "replace",
"replacement": "[CARD REDACTED]",
},
{"pii_type": "ip_address", "strategy": "hash"},
],
"custom_patterns": {},
_luhn_valid function · python · L100-L111 (12 LOC)src/aumai_pii_redactor/detector.py
def _luhn_valid(number_str: str) -> bool:
digits = [int(c) for c in number_str if c.isdigit()]
if len(digits) < 13:
return False
total = 0
for i, digit in enumerate(reversed(digits)):
if i % 2 == 1:
digit *= 2
if digit > 9:
digit -= 9
total += digit
return total % 10 == 0PIIDetector.__init__ method · python · L121-L126 (6 LOC)src/aumai_pii_redactor/detector.py
def __init__(self, config: RedactionConfig) -> None:
self._config = config
self._custom_patterns: list[tuple[PIIType, re.Pattern[str], float]] = []
for _label, raw_pattern in config.custom_patterns.items():
compiled = re.compile(raw_pattern)
self._custom_patterns.append((PIIType.custom, compiled, 0.80))Repobility · open methodology · https://repobility.com/research/
PIIDetector.detect method · python · L128-L163 (36 LOC)src/aumai_pii_redactor/detector.py
def detect(self, text: str) -> list[PIIMatch]:
"""Return all PII matches found in *text*, deduplicated by span."""
matches: list[PIIMatch] = []
seen_spans: set[tuple[int, int]] = set()
for pii_type, pattern, base_confidence in (
_BUILTIN_PATTERNS + self._custom_patterns
):
for match in pattern.finditer(text):
span = (match.start(), match.end())
if span in seen_spans:
continue
original = match.group()
confidence = base_confidence
# Boost credit card confidence with Luhn check
if pii_type == PIIType.credit_card:
if _luhn_valid(original):
confidence = min(1.0, confidence + 0.08)
else:
confidence = max(0.0, confidence - 0.30)
seen_spans.add(span)
matches.append(
PIIMaPIIDetector.detect_in_dict method · python · L165-L178 (14 LOC)src/aumai_pii_redactor/detector.py
def detect_in_dict(self, data: dict[str, object]) -> dict[str, list[PIIMatch]]:
"""Recursively traverse *data* and detect PII in all string values.
Returns:
A mapping from dot-joined key path to the list of matches found in
that value. Only paths with at least one match are included.
"""
results: dict[str, list[PIIMatch]] = {}
for path, value in _flatten_dict(data):
if isinstance(value, str):
matches = self.detect(value)
if matches:
results[path] = matches
return results_flatten_dict function · python · L185-L199 (15 LOC)src/aumai_pii_redactor/detector.py
def _flatten_dict(
data: object,
prefix: str = "",
) -> Iterator[tuple[str, object]]:
"""Yield (dotted_key, value) pairs for all leaf values in a nested dict."""
if isinstance(data, dict):
for key, val in data.items():
full_key = f"{prefix}.{key}" if prefix else str(key)
yield from _flatten_dict(val, full_key)
elif isinstance(data, list):
for idx, item in enumerate(data):
full_key = f"{prefix}[{idx}]"
yield from _flatten_dict(item, full_key)
else:
yield prefix, dataPIIRedactingSpanProcessor.on_start method · python · L40-L45 (6 LOC)src/aumai_pii_redactor/otel_processor.py
def on_start(
self,
span: Span,
parent_context: Context | None = None,
) -> None:
"""No-op: PII is redacted on span end, not on start."""PIIRedactingSpanProcessor.on_end method · python · L47-L74 (28 LOC)src/aumai_pii_redactor/otel_processor.py
def on_end(self, span: ReadableSpan) -> None:
"""Redact PII from all string attributes on the finished span.
The span's internal attributes dict is mutated in-place. This is
intentional — we want the redacted values to flow to any downstream
exporters that were added after this processor.
"""
if span.attributes is None:
return
# span.attributes is a BoundedAttributes mapping; we can mutate it
# via the underlying dict by accessing _attributes (SDK internal).
# We build a replacement dict to avoid mutation during iteration.
redacted_attrs: dict[str, object] = {}
for key, value in span.attributes.items():
if isinstance(value, str):
redacted_attrs[key] = self._redactor.redact(value).redacted_text
else:
redacted_attrs[key] = value
# Overwrite values in the internal dict if accessible, otherwise no-op.
try:
_apply_strategy function · python · L41-L53 (13 LOC)src/aumai_pii_redactor/redactor.py
def _apply_strategy(
original: str,
strategy: RedactionStrategy,
replacement: str | None,
) -> str:
if strategy == RedactionStrategy.mask:
return _apply_mask(original)
if strategy == RedactionStrategy.hash:
return _apply_hash(original)
if strategy == RedactionStrategy.remove:
return _apply_remove(original)
# replace
return _apply_replace(original, replacement)PIIRedactor.__init__ method · python · L63-L69 (7 LOC)src/aumai_pii_redactor/redactor.py
def __init__(self, config: RedactionConfig) -> None:
self._config = config
self._detector = PIIDetector(config)
# Build a quick lookup from PIIType -> RedactionRule
self._rule_map: dict[PIIType, RedactionRule] = {
rule.pii_type: rule for rule in config.rules
}PIIRedactor.redact method · python · L75-L104 (30 LOC)src/aumai_pii_redactor/redactor.py
def redact(self, text: str) -> RedactionResult:
"""Detect and redact all PII in *text*.
Overlapping spans are handled by processing matches in reverse order so
character positions remain valid throughout the replacement loop.
"""
matches = self._detector.detect(text)
if not matches:
return RedactionResult(
original_length=len(text),
redacted_text=text,
matches_found=[],
redactions_applied=0,
)
# Process in reverse to preserve forward indices
redacted = text
applied = 0
for match in reversed(matches):
strategy, replacement = self._resolve_strategy(match.pii_type)
substitution = _apply_strategy(match.original_text, strategy, replacement)
redacted = redacted[: match.start] + substitution + redacted[match.end :]
applied += 1
return RedactionResult(
orPowered by Repobility — scan your code at https://repobility.com
PIIRedactor.redact_dict method · python · L106-L112 (7 LOC)src/aumai_pii_redactor/redactor.py
def redact_dict(self, data: dict[str, object]) -> dict[str, object]:
"""Recursively redact all string values in *data*.
Non-string leaf values are left unchanged. The input dict is not
mutated; a deep copy with redacted strings is returned.
"""
return self._redact_value(data) # type: ignore[return-value]PIIRedactor._resolve_strategy method · python · L118-L124 (7 LOC)src/aumai_pii_redactor/redactor.py
def _resolve_strategy(
self, pii_type: PIIType
) -> tuple[RedactionStrategy, str | None]:
rule = self._rule_map.get(pii_type)
if rule:
return rule.strategy, rule.replacement
return self._config.default_strategy, NonePIIRedactor._redact_value method · python · L126-L133 (8 LOC)src/aumai_pii_redactor/redactor.py
def _redact_value(self, value: object) -> object:
if isinstance(value, str):
return self.redact(value).redacted_text
if isinstance(value, dict):
return {k: self._redact_value(v) for k, v in value.items()}
if isinstance(value, list):
return [self._redact_value(item) for item in value]
return value