← back to dpaq7__scrawl

Function bodies 108 total

All specs Real LLM only Function bodies
deny function · python · L27-L29 (3 LOC)
.claude/hooks/block_dangerous_git.py
def deny(reason: str) -> None:
    print(f"block_dangerous_git: BLOCKED — {reason}", file=sys.stderr)
    sys.exit(2)
check_git_push function · python · L32-L63 (32 LOC)
.claude/hooks/block_dangerous_git.py
def check_git_push(tokens: list[str]) -> None:
    """Reject force-push and direct-to-main pushes."""
    force_flags = {
        "-f",
        "--force",
        "--force-with-lease",
        "--force-if-includes",
    }
    if any(t in force_flags or t.startswith("--force-with-lease=") or t.startswith("--force-if-includes=") for t in tokens):
        deny("force push (any variant) is forbidden")

    if "--no-verify" in tokens:
        deny("--no-verify on push is forbidden (don't skip hooks)")

    # Walk the positional args (after `git push` and any flags).
    # Detect direct push to main/master and force refspecs.
    for t in tokens[2:]:
        if t.startswith("-"):
            continue
        # Force refspec — anything starting with '+' pushes force semantics.
        # `git push origin +branch` or `+refs/heads/foo:refs/heads/bar`.
        if t.startswith("+") and t not in {"+", "++"}:
            deny(f"force refspec '{t}' (leading '+' means force push)")
        # Direct-to
check_git_reset function · python · L66-L68 (3 LOC)
.claude/hooks/block_dangerous_git.py
def check_git_reset(tokens: list[str]) -> None:
    if "--hard" in tokens or "--merge" in tokens or "--keep" in tokens:
        deny("git reset --hard/--merge/--keep is forbidden (use ./scripts/reset-work.sh)")
check_git_clean function · python · L71-L77 (7 LOC)
.claude/hooks/block_dangerous_git.py
def check_git_clean(tokens: list[str]) -> None:
    # Any `git clean -f` or its variants. `-n` (dry run) is allowed.
    for t in tokens[2:]:
        if t.startswith("-") and "f" in t.lower():
            deny("git clean -f is forbidden (use ./scripts/reset-work.sh)")
        if t.startswith("-") and "x" in t:
            deny("git clean -x is forbidden (would delete ignored files)")
check_git_commit function · python · L80-L84 (5 LOC)
.claude/hooks/block_dangerous_git.py
def check_git_commit(tokens: list[str]) -> None:
    if "--amend" in tokens:
        deny("git commit --amend is forbidden (create a new commit instead)")
    if "--no-verify" in tokens:
        deny("git commit --no-verify is forbidden (don't skip hooks)")
check_git_rebase function · python · L87-L89 (3 LOC)
.claude/hooks/block_dangerous_git.py
def check_git_rebase(tokens: list[str]) -> None:
    if "-i" in tokens or "--interactive" in tokens:
        deny("interactive rebase is forbidden")
check_rm function · python · L92-L99 (8 LOC)
.claude/hooks/block_dangerous_git.py
def check_rm(tokens: list[str]) -> None:
    # rm -rf variants. Allow `rm <single-file>` and `rm -f <file>` but not
    # recursive deletes of directories.
    for t in tokens[1:]:
        if t.startswith("-") and "r" in t.lower() and "f" in t.lower():
            deny("rm -rf is forbidden (delete files individually)")
        if t.startswith("-") and "R" in t:
            deny("rm -R is forbidden (delete files individually)")
If a scraper extracted this row, it came from Repobility (https://repobility.com)
check_curl_pipe function · python · L102-L108 (7 LOC)
.claude/hooks/block_dangerous_git.py
def check_curl_pipe(cmd: str) -> None:
    # curl-pipe-sh patterns. Defang by searching the raw string.
    lowered = cmd.lower()
    if ("curl" in lowered or "wget" in lowered) and "|" in cmd:
        for shell in ("sh", "bash", "zsh"):
            if f"| {shell}" in lowered or f"|{shell}" in lowered or f"| /bin/{shell}" in lowered:
                deny("piping curl/wget to a shell is forbidden (download, inspect, then run)")
check_publish function · python · L111-L121 (11 LOC)
.claude/hooks/block_dangerous_git.py
def check_publish(tokens: list[str]) -> None:
    if tokens[:2] == ["uv", "publish"]:
        deny("uv publish is a human-only action")
    if tokens[:2] == ["npm", "publish"]:
        deny("npm publish is a human-only action")
    if tokens[:2] == ["twine", "upload"]:
        deny("twine upload is a human-only action")
    if tokens[:2] == ["yarn", "publish"]:
        deny("yarn publish is a human-only action")
    if tokens[:2] == ["cargo", "publish"]:
        deny("cargo publish is a human-only action")
check_sudo function · python · L124-L128 (5 LOC)
.claude/hooks/block_dangerous_git.py
def check_sudo(tokens: list[str]) -> None:
    if tokens and tokens[0] == "sudo":
        deny("sudo is forbidden")
    if tokens and tokens[0] == "doas":
        deny("doas is forbidden")
check_pip function · python · L131-L139 (9 LOC)
.claude/hooks/block_dangerous_git.py
def check_pip(tokens: list[str]) -> None:
    # `pip install` bypasses uv's lockfile and can silently corrupt the venv.
    # Everything in this repo goes through `uv add` / `uv sync` / `uv pip`.
    if tokens[:2] == ["pip", "install"]:
        deny("pip install is forbidden (use 'uv add' or 'uv sync --extra <name>')")
    if tokens[:3] == ["python", "-m", "pip"] and len(tokens) >= 4 and tokens[3] == "install":
        deny("python -m pip install is forbidden (use 'uv add' or 'uv sync')")
    if tokens[:3] == ["python3", "-m", "pip"] and len(tokens) >= 4 and tokens[3] == "install":
        deny("python3 -m pip install is forbidden (use 'uv add' or 'uv sync')")
check_gh function · python · L142-L165 (24 LOC)
.claude/hooks/block_dangerous_git.py
def check_gh(tokens: list[str]) -> None:
    # `gh` (GitHub CLI) can reach around branch protection and delete
    # published state. Allow reads and PR/issue creation; block anything
    # destructive or anything that bypasses PR review.
    if len(tokens) < 2 or tokens[0] != "gh":
        return

    # Admin-merge bypasses branch protection. Hard no.
    if tokens[:3] == ["gh", "pr", "merge"] and "--admin" in tokens:
        deny("gh pr merge --admin bypasses branch protection (forbidden)")

    # Release deletion is irreversible in practice.
    if tokens[:3] == ["gh", "release", "delete"]:
        deny("gh release delete is forbidden (releases are immutable by convention)")

    # Auth logout would nuke the token the hook itself uses.
    if tokens[:3] == ["gh", "auth", "logout"]:
        deny("gh auth logout is forbidden (locks out CI + other scripts)")

    # Repo-level destruction.
    if tokens[:3] == ["gh", "repo", "delete"]:
        deny("gh repo delete is forbidden (always a
check_dd function · python · L168-L176 (9 LOC)
.claude/hooks/block_dangerous_git.py
def check_dd(tokens: list[str]) -> None:
    # `dd` has legitimate uses (generating random files, image conversion).
    # The dangerous form is writing to a raw device (`of=/dev/...`) which
    # can wipe disks. Block only that pattern; allow `if=/dev/urandom`.
    if not tokens or tokens[0] != "dd":
        return
    for t in tokens[1:]:
        if t.startswith("of=/dev/"):
            deny(f"dd {t} writes to a raw device (would wipe disk)")
check_destructive_system function · python · L179-L189 (11 LOC)
.claude/hooks/block_dangerous_git.py
def check_destructive_system(tokens: list[str]) -> None:
    # Filesystem creation, shutdown, reboot, halt. No legitimate agent use.
    if not tokens:
        return
    first = tokens[0]
    if first.startswith("mkfs"):
        deny(f"{first} creates a filesystem (forbidden)")
    if first in {"shutdown", "reboot", "halt", "poweroff"}:
        deny(f"{first} is forbidden")
    if first == "init" and len(tokens) >= 2 and tokens[1] in {"0", "6"}:
        deny("init 0/6 is forbidden (shutdown/reboot)")
check_eval_source function · python · L192-L206 (15 LOC)
.claude/hooks/block_dangerous_git.py
def check_eval_source(tokens: list[str], cmd: str) -> None:
    # `eval` runs arbitrary shell. `source /dev/stdin` / `. /dev/stdin`
    # pipes arbitrary content into the current shell. Both defeat every
    # other defense in the hook.
    if not tokens:
        return
    if tokens[0] == "eval":
        deny("eval is forbidden (arbitrary code execution)")
    # `source /dev/stdin` or `. /dev/stdin`
    if tokens[0] in {"source", "."} and len(tokens) >= 2:
        if tokens[1] in {"/dev/stdin", "/dev/fd/0"}:
            deny(f"{tokens[0]} {tokens[1]} is forbidden (arbitrary code from stdin)")
    # Fork bomb pattern. Match the literal `:(){` token.
    if ":(){" in cmd or ":() {" in cmd:
        deny("fork bomb pattern detected")
Want this analysis on your repo? https://repobility.com/scan/
main function · python · L209-L256 (48 LOC)
.claude/hooks/block_dangerous_git.py
def main() -> None:
    try:
        payload = json.loads(sys.stdin.read() or "{}")
    except json.JSONDecodeError:
        # Can't parse the hook payload → don't block; let Claude Code handle it.
        sys.exit(0)

    tool_input = payload.get("tool_input") or {}
    cmd = tool_input.get("command")
    if not isinstance(cmd, str) or not cmd.strip():
        sys.exit(0)

    # Always check the raw string for curl-pipe patterns; pipes break shlex tokenization.
    check_curl_pipe(cmd)

    # Tokenize. On parse failure, default-deny per the contract.
    try:
        tokens = shlex.split(cmd, comments=True, posix=True)
    except ValueError as exc:
        deny(f"unparseable command ({exc}); default-deny")

    if not tokens:
        sys.exit(0)

    check_sudo(tokens)
    check_publish(tokens)
    check_pip(tokens)
    check_gh(tokens)
    check_dd(tokens)
    check_destructive_system(tokens)
    check_eval_source(tokens, cmd)

    if tokens[0] == "rm":
        check_rm(tokens)

    
main function · python · L21-L57 (37 LOC)
.claude/hooks/honor_loop_guard.py
def main() -> None:
    try:
        payload = json.loads(sys.stdin.read() or "{}")
    except json.JSONDecodeError:
        sys.exit(0)

    if payload.get("tool_name") != "Bash":
        sys.exit(0)

    tool_response = payload.get("tool_response") or {}
    # The exit code field name depends on Claude Code version; check both.
    exit_code = tool_response.get("exit_code")
    if exit_code is None:
        exit_code = tool_response.get("code")
    if exit_code is None:
        # Some versions surface stderr but no exit code; try parsing stdout.
        stdout = tool_response.get("stdout") or ""
        if "exit 42" in stdout or "infinite-loop guard" in stdout:
            exit_code = 42

    if exit_code == 42:
        cmd = (payload.get("tool_input") or {}).get("command") or "<unknown>"
        msg = (
            "\n"
            "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
            "LOOP GUARD TRIPPED (exit 42)\n"
            f"  command: {cmd}\n"
            "  meanin
should_format function · python · L21-L24 (4 LOC)
.claude/hooks/ruff_format_on_save.py
def should_format(path_str: str) -> bool:
    if not path_str.endswith(".py"):
        return False
    return any(path_str.startswith(root) or f"/{root}" in path_str for root in FORMAT_ROOTS)
main function · python · L27-L72 (46 LOC)
.claude/hooks/ruff_format_on_save.py
def main() -> None:
    try:
        payload = json.loads(sys.stdin.read() or "{}")
    except json.JSONDecodeError:
        sys.exit(0)

    tool_name = payload.get("tool_name") or ""
    if tool_name not in {"Edit", "Write", "NotebookEdit"}:
        sys.exit(0)

    tool_input = payload.get("tool_input") or {}
    file_path = tool_input.get("file_path") or tool_input.get("path")
    if not isinstance(file_path, str):
        sys.exit(0)

    # Normalize to a repo-relative path for the should_format check.
    try:
        repo_root = Path(__file__).resolve().parents[2]
        rel = Path(file_path).resolve().relative_to(repo_root)
    except (ValueError, OSError):
        sys.exit(0)

    rel_str = str(rel)
    if not should_format(rel_str):
        sys.exit(0)

    # Best-effort format + safe autofix. Swallow errors — we never block.
    try:
        subprocess.run(
            ["uv", "run", "ruff", "format", rel_str],
            cwd=repo_root,
            check=False,
            
cluster_name_variants function · python · L11-L59 (49 LOC)
src/scrawl/anonymize/clustering.py
def cluster_name_variants(
    names: list[str],
    threshold: float = 0.85,
) -> list[list[str]]:
    """Cluster name strings that likely refer to the same person.

    Uses Jaro-Winkler similarity with additional heuristics for
    medical name patterns (Dr., M.D., NP, PT, etc.)
    """
    # Deduplicate while preserving order
    seen = set()
    unique_names = []
    for name in names:
        normalized = name.strip()
        if normalized.lower() not in seen:
            seen.add(normalized.lower())
            unique_names.append(normalized)

    if not unique_names:
        return []

    # Sort longest first (most informative matches first)
    unique_names.sort(key=len, reverse=True)

    clusters: list[list[str]] = []
    cluster_canonical: list[str] = []  # Normalized form for matching

    for name in unique_names:
        norm_name = _normalize_name(name)
        best_cluster_idx = -1
        best_score = 0.0

        for idx, canonical in enumerate(cluster_canonical):
 
_normalize_name function · python · L62-L96 (35 LOC)
src/scrawl/anonymize/clustering.py
def _normalize_name(name: str) -> str:
    """Strip titles, suffixes, and punctuation for comparison."""
    prefixes = [
        r"\bDr\.?\b",
        r"\bMr\.?\b",
        r"\bMs\.?\b",
        r"\bMrs\.?\b",
        r"\bNurse\s+Practitioner\b",
        r"\bPhysical\s+Therapist\b",
    ]
    suffixes = [
        r",?\s*M\.?D\.?",
        r",?\s*D\.?O\.?",
        r",?\s*Ph\.?D\.?",
        r",?\s*Psy\.?D\.?",
        r",?\s*D\.?C\.?",
        r",?\s*APN\b",
        r",?\s*NP\b",
        r",?\s*PA\b",
        r",?\s*PT\b",
        r",?\s*PTA\b",
        r",?\s*OT\b",
        r",?\s*Jr\.?\b",
        r",?\s*Sr\.?\b",
        r",?\s*III?\b",
    ]

    result = name
    for pattern in prefixes:
        result = re.sub(pattern, "", result, flags=re.IGNORECASE)
    for pattern in suffixes:
        result = re.sub(pattern, "", result, flags=re.IGNORECASE)

    result = re.sub(r"\s+", " ", result).strip().strip(",").strip()
    return result.lower()
_name_component_similarity function · python · L99-L127 (29 LOC)
src/scrawl/anonymize/clustering.py
def _name_component_similarity(name1: str, name2: str) -> float:
    """Compare names by their component parts."""
    parts1 = name1.split()
    parts2 = name2.split()

    if not parts1 or not parts2:
        return 0.0

    last1 = parts1[-1] if parts1 else ""
    last2 = parts2[-1] if parts2 else ""

    last_sim = fuzz.ratio(last1, last2) / 100.0
    if last_sim < 0.8:
        return last_sim * 0.5

    if len(parts1) > 1 and len(parts2) > 1:
        first1 = parts1[0]
        first2 = parts2[0]

        if len(first1) <= 2 or len(first2) <= 2:
            short = first1 if len(first1) <= len(first2) else first2
            long = first2 if len(first1) <= len(first2) else first1
            if long.startswith(short.rstrip(".")):
                return 0.95

        first_sim = fuzz.ratio(first1, first2) / 100.0
        return (last_sim * 0.6) + (first_sim * 0.4)

    return last_sim * 0.8
get_ssa_recognizers function · python · L10-L17 (8 LOC)
src/scrawl/anonymize/custom_recognizers.py
def get_ssa_recognizers() -> list[PatternRecognizer]:
    """Return list of custom recognizers for SSA documents."""
    return [
        _ssn_recognizer(),
        _medical_record_number_recognizer(),
        _ssa_case_number_recognizer(),
        _transcript_reference_recognizer(),
    ]
About: code-quality intelligence by Repobility · https://repobility.com
_ssn_recognizer function · python · L20-L47 (28 LOC)
src/scrawl/anonymize/custom_recognizers.py
def _ssn_recognizer() -> PatternRecognizer:
    """Enhanced SSN recognizer including partial SSNs.

    Safe Harbor requires removing ALL SSN digits, including last 4.
    Matches: 123-45-6789, 123 45 6789, XXX-XX-6789, ***-**-6789
    """
    return PatternRecognizer(
        supported_entity="US_SSN",
        name="enhanced_ssn",
        patterns=[
            Pattern(
                name="full_ssn",
                regex=r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
                score=0.85,
            ),
            Pattern(
                name="partial_ssn_last4",
                regex=r"(?:[Xx*]{3}[-\s]?[Xx*]{2}[-\s]?)(\d{4})\b",
                score=0.7,
            ),
            Pattern(
                name="ssn_label_nearby",
                regex=r"(?:SSN|Social\s+Security(?:\s+(?:Number|No\.?|#))?)\s*:?\s*(\d{3}[-\s]?\d{2}[-\s]?\d{4})",
                score=0.95,
            ),
        ],
        supported_language="en",
    )
_medical_record_number_recognizer function · python · L50-L68 (19 LOC)
src/scrawl/anonymize/custom_recognizers.py
def _medical_record_number_recognizer() -> PatternRecognizer:
    """Detect medical record numbers."""
    return PatternRecognizer(
        supported_entity="MEDICAL_RECORD",
        name="mrn_recognizer",
        patterns=[
            Pattern(
                name="mrn_labeled",
                regex=r"(?:MRN|MR#|Medical\s+Record(?:\s+(?:Number|No\.?|#))?)[\s:]*(\d{5,12})",
                score=0.9,
            ),
            Pattern(
                name="patient_id",
                regex=r"(?:Patient\s+(?:ID|No\.?|#|Number))[\s:]*(\d{5,12})",
                score=0.85,
            ),
        ],
        supported_language="en",
    )
_ssa_case_number_recognizer function · python · L71-L89 (19 LOC)
src/scrawl/anonymize/custom_recognizers.py
def _ssa_case_number_recognizer() -> PatternRecognizer:
    """Detect SSA case numbers (NOT anonymized — legal citations)."""
    return PatternRecognizer(
        supported_entity="LEGAL_CASE_NUMBER",
        name="ssa_case_number",
        patterns=[
            Pattern(
                name="federal_case",
                regex=r"\b\d{1,2}:\d{2}-cv-\d{3,5}(?:-[A-Z]{2,4})?\b",
                score=0.95,
            ),
            Pattern(
                name="appeal_number",
                regex=r"\b\d{2}-\d{4,6}\b",
                score=0.5,
            ),
        ],
        supported_language="en",
    )
_transcript_reference_recognizer function · python · L92-L105 (14 LOC)
src/scrawl/anonymize/custom_recognizers.py
def _transcript_reference_recognizer() -> PatternRecognizer:
    """Detect transcript references like (Tr. 120) or (Tr. 69-102)."""
    return PatternRecognizer(
        supported_entity="TRANSCRIPT_REF",
        name="transcript_ref",
        patterns=[
            Pattern(
                name="tr_ref",
                regex=r"\(Tr\.\s*\d+(?:\s*[-–]\s*\d+)?\)",
                score=0.95,
            ),
        ],
        supported_language="en",
    )
_overlaps_pattern function · python · L158-L168 (11 LOC)
src/scrawl/anonymize/engine.py
def _overlaps_pattern(pattern, text, start, end):
    """Check if entity span [start, end) overlaps any match of pattern."""
    search_start = max(0, start - 15)
    search_end = min(len(text), end + 15)
    window = text[search_start:search_end]
    for match in pattern.finditer(window):
        match_abs_start = search_start + match.start()
        match_abs_end = search_start + match.end()
        if match_abs_start < end and match_abs_end > start:
            return True
    return False
filter_legal_false_positives function · python · L171-L230 (60 LOC)
src/scrawl/anonymize/engine.py
def filter_legal_false_positives(results, text):
    """Remove Presidio results that match known legal text patterns.

    Runs after analyzer.analyze() and before should_anonymize() to strip
    false positives from statute citations, time durations, court names,
    transcript references, and professional degrees.
    Uses span overlap to avoid filtering real entities near legal patterns.
    """
    filtered = []
    for result in results:
        entity_text = text[result.start : result.end]

        if result.entity_type == "DATE_TIME":
            if _overlaps_pattern(_STATUTE_RE, text, result.start, result.end):
                continue
            if _overlaps_pattern(_SECTION_RE, text, result.start, result.end):
                continue
            if _overlaps_pattern(_REPORTER_RE, text, result.start, result.end):
                continue
            if _overlaps_pattern(_TRANSCRIPT_RE, text, result.start, result.end):
                continue
            if _overlaps_pattern
AnonymizationResult class · python · L234-L240 (7 LOC)
src/scrawl/anonymize/engine.py
class AnonymizationResult:
    original_text: str
    anonymized_text: str
    entities_found: int
    entities_anonymized: int
    entities_preserved: int
    mapping: dict[str, str]
AnonymizationEngine class · python · L243-L367 (125 LOC)
src/scrawl/anonymize/engine.py
class AnonymizationEngine:
    """Orchestrates HIPAA Safe Harbor de-identification for a case."""

    def __init__(self, config):
        self.config = config
        self.analyzer: AnalyzerEngine | None = None
        self.anonymizer: AnonymizerEngine | None = None
        self.allowlist: list[AllowlistEntry] = []
        self.entity_map: AnonymizationMap = AnonymizationMap()

    def initialize(self):
        """Load NLP models and configure Presidio."""
        nlp_config = {
            "nlp_engine_name": "spacy",
            "models": [{"lang_code": "en", "model_name": self.config.anonymize.spacy_model}],
        }
        nlp_engine = NlpEngineProvider(nlp_configuration=nlp_config).create_engine()

        registry = RecognizerRegistry()
        registry.load_predefined_recognizers(nlp_engine=nlp_engine)
        for recognizer in get_ssa_recognizers():
            registry.add_recognizer(recognizer)

        self.analyzer = AnalyzerEngine(
            nlp_engine=nlp_engine,
    
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
__init__ method · python · L246-L251 (6 LOC)
src/scrawl/anonymize/engine.py
    def __init__(self, config):
        self.config = config
        self.analyzer: AnalyzerEngine | None = None
        self.anonymizer: AnonymizerEngine | None = None
        self.allowlist: list[AllowlistEntry] = []
        self.entity_map: AnonymizationMap = AnonymizationMap()
initialize method · python · L253-L270 (18 LOC)
src/scrawl/anonymize/engine.py
    def initialize(self):
        """Load NLP models and configure Presidio."""
        nlp_config = {
            "nlp_engine_name": "spacy",
            "models": [{"lang_code": "en", "model_name": self.config.anonymize.spacy_model}],
        }
        nlp_engine = NlpEngineProvider(nlp_configuration=nlp_config).create_engine()

        registry = RecognizerRegistry()
        registry.load_predefined_recognizers(nlp_engine=nlp_engine)
        for recognizer in get_ssa_recognizers():
            registry.add_recognizer(recognizer)

        self.analyzer = AnalyzerEngine(
            nlp_engine=nlp_engine,
            registry=registry,
        )
        self.anonymizer = AnonymizerEngine()
shutdown method · python · L272-L283 (12 LOC)
src/scrawl/anonymize/engine.py
    def shutdown(self):
        """Release NLP models from memory."""
        self.analyzer = None
        self.anonymizer = None
        gc.collect()
        try:
            import torch

            if torch.backends.mps.is_available():
                torch.mps.empty_cache()
        except ImportError:
            pass
build_case_allowlist method · python · L285-L287 (3 LOC)
src/scrawl/anonymize/engine.py
    def build_case_allowlist(self, all_documents_text: str):
        """Build allowlist from all documents in a case."""
        self.allowlist = build_dynamic_allowlist(all_documents_text)
build_entity_map method · python · L289-L317 (29 LOC)
src/scrawl/anonymize/engine.py
    def build_entity_map(self, all_documents_text: str):
        """Pre-scan all documents to build consistent entity mapping."""
        if self.analyzer is None:
            raise RuntimeError("Call initialize() first")

        results = self.analyzer.analyze(
            text=all_documents_text,
            entities=self.config.anonymize.presidio.entities,
            language="en",
            score_threshold=self.config.anonymize.presidio.score_threshold,
        )
        results = filter_legal_false_positives(results, all_documents_text)

        person_entities = []
        for result in results:
            entity_text = all_documents_text[result.start : result.end]
            if result.entity_type == "PERSON":
                if should_anonymize(entity_text, result.entity_type, self.allowlist):
                    person_entities.append(entity_text)

        clusters = cluster_name_variants(
            person_entities,
            threshold=self.config.anonymize.fuzzy_matc
anonymize method · python · L319-L367 (49 LOC)
src/scrawl/anonymize/engine.py
    def anonymize(self, text: str) -> AnonymizationResult:
        """Anonymize a single document's text."""
        if self.analyzer is None:
            raise RuntimeError("Call initialize() first")

        results = self.analyzer.analyze(
            text=text,
            entities=self.config.anonymize.presidio.entities,
            language="en",
            score_threshold=self.config.anonymize.presidio.score_threshold,
        )
        results = filter_legal_false_positives(results, text)

        entities_found = len(results)
        entities_preserved = 0
        entities_anonymized = 0

        filtered_results = []
        for result in results:
            entity_text = text[result.start : result.end]
            if should_anonymize(entity_text, result.entity_type, self.allowlist):
                filtered_results.append(result)
                entities_anonymized += 1
            else:
                entities_preserved += 1

        # Build operator configs using consis
AnonymizationMap class · python · L11-L152 (142 LOC)
src/scrawl/anonymize/mapping.py
class AnonymizationMap:
    """Manages consistent pseudonym mappings for a case."""

    def __init__(self):
        self._person_map: dict[str, str] = {}
        self._entity_counters: dict[str, int] = {}

    def build_from_clusters(self, clusters: list[list[str]], format_config):
        """Assign pseudonyms to name clusters."""
        person_id = 1
        provider_id = 1

        for cluster in clusters:
            is_provider = any(
                indicator in name.lower()
                for name in cluster
                for indicator in [
                    "dr.",
                    "dr ",
                    "m.d.",
                    "d.o.",
                    "ph.d.",
                    "psy.d.",
                    "apn",
                    " np",
                    " pa",
                    " pt",
                    " pta",
                    " ot",
                    "d.c.",
                    "nurse",
                    "therapist",
                ]
  
__init__ method · python · L14-L16 (3 LOC)
src/scrawl/anonymize/mapping.py
    def __init__(self):
        self._person_map: dict[str, str] = {}
        self._entity_counters: dict[str, int] = {}
If a scraper extracted this row, it came from Repobility (https://repobility.com)
build_from_clusters method · python · L18-L54 (37 LOC)
src/scrawl/anonymize/mapping.py
    def build_from_clusters(self, clusters: list[list[str]], format_config):
        """Assign pseudonyms to name clusters."""
        person_id = 1
        provider_id = 1

        for cluster in clusters:
            is_provider = any(
                indicator in name.lower()
                for name in cluster
                for indicator in [
                    "dr.",
                    "dr ",
                    "m.d.",
                    "d.o.",
                    "ph.d.",
                    "psy.d.",
                    "apn",
                    " np",
                    " pa",
                    " pt",
                    " pta",
                    " ot",
                    "d.c.",
                    "nurse",
                    "therapist",
                ]
            )

            if is_provider:
                pseudonym = format_config.provider.format(provider_id)
                provider_id += 1
            else:
                pseudonym = format_config.pe
get_pseudonym method · python · L56-L95 (40 LOC)
src/scrawl/anonymize/mapping.py
    def get_pseudonym(self, entity_text: str, entity_type: str) -> str:
        """Look up or create a pseudonym for an entity."""
        key = entity_text.lower().strip()

        if entity_type == "PERSON":
            if key in self._person_map:
                return self._person_map[key]
            from rapidfuzz import fuzz

            best_match = None
            best_score = 0
            for stored_key, pseudonym in self._person_map.items():
                score = fuzz.partial_ratio(key, stored_key) / 100.0
                if score > best_score and score >= 0.80:
                    best_score = score
                    best_match = pseudonym
            if best_match:
                self._person_map[key] = best_match
                return best_match
            next_id = len(set(v for v in self._person_map.values() if v.startswith("[PERSON"))) + 1
            pseudonym = f"[PERSON-{next_id:03d}]"
            self._person_map[key] = pseudonym
            return pseudon
get_mapping_dict method · python · L97-L99 (3 LOC)
src/scrawl/anonymize/mapping.py
    def get_mapping_dict(self) -> dict[str, str]:
        """Return full mapping dictionary."""
        return dict(self._person_map)
save_encrypted method · python · L101-L131 (31 LOC)
src/scrawl/anonymize/mapping.py
    def save_encrypted(self, db_path: Path, case_id: str):
        """Save mapping to encrypted SQLite database."""
        key = os.environ.get("SCRAWL_MAP_KEY")
        if not key:
            raise EnvironmentError(
                "SCRAWL_MAP_KEY environment variable not set. "
                "Generate with: python -c "
                "'from cryptography.fernet import Fernet; "
                "print(Fernet.generate_key().decode())'"
            )

        f = Fernet(key.encode())
        mapping_json = json.dumps(self._person_map)
        encrypted = f.encrypt(mapping_json.encode())

        conn = sqlite3.connect(str(db_path))
        conn.execute("""
            CREATE TABLE IF NOT EXISTS mappings (
                case_id TEXT PRIMARY KEY,
                encrypted_data BLOB,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                entity_count INTEGER
            )
        """)
        conn.execute(
            "INSERT OR REPLACE INTO mappings "
      
load_encrypted method · python · L133-L152 (20 LOC)
src/scrawl/anonymize/mapping.py
    def load_encrypted(self, db_path: Path, case_id: str):
        """Load mapping from encrypted SQLite database."""
        key = os.environ.get("SCRAWL_MAP_KEY")
        if not key:
            raise EnvironmentError("SCRAWL_MAP_KEY environment variable not set.")

        f = Fernet(key.encode())

        conn = sqlite3.connect(str(db_path))
        row = conn.execute(
            "SELECT encrypted_data FROM mappings WHERE case_id = ?",
            (case_id,),
        ).fetchone()
        conn.close()

        if row is None:
            raise KeyError(f"No mapping found for case_id: {case_id}")

        decrypted = f.decrypt(row[0])
        self._person_map = json.loads(decrypted)
AllowlistEntry class · python · L8-L11 (4 LOC)
src/scrawl/anonymize/selective.py
class AllowlistEntry:
    name: str
    category: str  # "judge" | "case_law" | "commissioner" | "legal_citation"
    source: str  # "static" | "extracted"
build_dynamic_allowlist function · python · L55-L88 (34 LOC)
src/scrawl/anonymize/selective.py
def build_dynamic_allowlist(full_text: str) -> list[AllowlistEntry]:
    """Extract names from document text that should be preserved."""
    entries = []

    for match in CASE_LAW_PATTERN.finditer(full_text):
        for group_idx in [1, 2]:
            name = match.group(group_idx).strip().rstrip(".")
            if len(name) > 2:
                entries.append(AllowlistEntry(name=name, category="case_law", source="extracted"))

    for match in JUDGE_PATTERN.finditer(full_text):
        name = match.group(1).strip().rstrip(",.")
        entries.append(AllowlistEntry(name=name, category="judge", source="extracted"))

    for match in COMMISSIONER_PATTERN.finditer(full_text):
        name = match.group(1).strip().rstrip(",.")
        entries.append(AllowlistEntry(name=name, category="commissioner", source="extracted"))

    for match in CIRCUIT_JUDGES_PATTERN.finditer(full_text):
        names_str = match.group(1)
        names = re.split(r"[,\s]+(?:and\s+)?", names_str)
        for 
should_anonymize function · python · L91-L112 (22 LOC)
src/scrawl/anonymize/selective.py
def should_anonymize(
    entity_text: str,
    entity_type: str,
    allowlist: list[AllowlistEntry],
    fuzzy_threshold: float = 0.85,
) -> bool:
    """Determine whether a detected entity should be anonymized."""
    from rapidfuzz import fuzz

    entity_clean = entity_text.strip().lower()

    for entry in allowlist:
        entry_clean = entry.name.strip().lower()

        if entity_clean in entry_clean or entry_clean in entity_clean:
            return False

        similarity = fuzz.partial_ratio(entity_clean, entry_clean) / 100.0
        if similarity >= fuzzy_threshold:
            return False

    return True
Want this analysis on your repo? https://repobility.com/scan/
DocumentInfo class · python · L18-L25 (8 LOC)
src/scrawl/assemble/markdown.py
class DocumentInfo:
    """Metadata for a single source document in the case."""

    filename: str
    doc_type: str
    filing_date: date | None
    page_count: int
    source_path: Path | None
assemble_case_markdown function · python · L41-L92 (52 LOC)
src/scrawl/assemble/markdown.py
def assemble_case_markdown(
    case_id: str,
    documents: list[DocumentInfo],
    page_results: dict[str, list],
    config,
) -> str:
    """Assemble all processed pages into a single Markdown file."""
    sections = []

    # 1. YAML Frontmatter
    if config.assemble.include_frontmatter:
        sections.append(_build_frontmatter(case_id, documents))

    # 2. Table of Contents
    if config.assemble.include_toc:
        sections.append(_build_toc(documents, config))

    # 3. Group documents by type, then chronological within each type
    ordered_docs = _order_documents(documents, config)

    # 4. Render each document
    current_type = None
    for doc in ordered_docs:
        if doc.doc_type != current_type:
            current_type = doc.doc_type
            label = DOC_TYPE_LABELS.get(current_type, current_type)
            sections.append(f"\n# {label}\n")

        date_str = doc.filing_date.isoformat() if doc.filing_date else "unknown"
        sections.append(f"\n## {doc
_build_frontmatter function · python · L95-L105 (11 LOC)
src/scrawl/assemble/markdown.py
def _build_frontmatter(case_id: str, documents: list[DocumentInfo]) -> str:
    """Build YAML frontmatter block."""
    doc_list = "\n".join(f'  - "{d.filename}"' for d in documents)
    return f"""---
case_id: "{case_id}"
processed_date: "{date.today().isoformat()}"
pipeline_version: "1.0"
document_count: {len(documents)}
documents:
{doc_list}
---"""
page 1 / 3next ›