Function bodies 108 total
deny function · python · L27-L29 (3 LOC).claude/hooks/block_dangerous_git.py
def deny(reason: str) -> None:
print(f"block_dangerous_git: BLOCKED — {reason}", file=sys.stderr)
sys.exit(2)check_git_push function · python · L32-L63 (32 LOC).claude/hooks/block_dangerous_git.py
def check_git_push(tokens: list[str]) -> None:
"""Reject force-push and direct-to-main pushes."""
force_flags = {
"-f",
"--force",
"--force-with-lease",
"--force-if-includes",
}
if any(t in force_flags or t.startswith("--force-with-lease=") or t.startswith("--force-if-includes=") for t in tokens):
deny("force push (any variant) is forbidden")
if "--no-verify" in tokens:
deny("--no-verify on push is forbidden (don't skip hooks)")
# Walk the positional args (after `git push` and any flags).
# Detect direct push to main/master and force refspecs.
for t in tokens[2:]:
if t.startswith("-"):
continue
# Force refspec — anything starting with '+' pushes force semantics.
# `git push origin +branch` or `+refs/heads/foo:refs/heads/bar`.
if t.startswith("+") and t not in {"+", "++"}:
deny(f"force refspec '{t}' (leading '+' means force push)")
# Direct-tocheck_git_reset function · python · L66-L68 (3 LOC).claude/hooks/block_dangerous_git.py
def check_git_reset(tokens: list[str]) -> None:
if "--hard" in tokens or "--merge" in tokens or "--keep" in tokens:
deny("git reset --hard/--merge/--keep is forbidden (use ./scripts/reset-work.sh)")check_git_clean function · python · L71-L77 (7 LOC).claude/hooks/block_dangerous_git.py
def check_git_clean(tokens: list[str]) -> None:
# Any `git clean -f` or its variants. `-n` (dry run) is allowed.
for t in tokens[2:]:
if t.startswith("-") and "f" in t.lower():
deny("git clean -f is forbidden (use ./scripts/reset-work.sh)")
if t.startswith("-") and "x" in t:
deny("git clean -x is forbidden (would delete ignored files)")check_git_commit function · python · L80-L84 (5 LOC).claude/hooks/block_dangerous_git.py
def check_git_commit(tokens: list[str]) -> None:
if "--amend" in tokens:
deny("git commit --amend is forbidden (create a new commit instead)")
if "--no-verify" in tokens:
deny("git commit --no-verify is forbidden (don't skip hooks)")check_git_rebase function · python · L87-L89 (3 LOC).claude/hooks/block_dangerous_git.py
def check_git_rebase(tokens: list[str]) -> None:
if "-i" in tokens or "--interactive" in tokens:
deny("interactive rebase is forbidden")check_rm function · python · L92-L99 (8 LOC).claude/hooks/block_dangerous_git.py
def check_rm(tokens: list[str]) -> None:
# rm -rf variants. Allow `rm <single-file>` and `rm -f <file>` but not
# recursive deletes of directories.
for t in tokens[1:]:
if t.startswith("-") and "r" in t.lower() and "f" in t.lower():
deny("rm -rf is forbidden (delete files individually)")
if t.startswith("-") and "R" in t:
deny("rm -R is forbidden (delete files individually)")If a scraper extracted this row, it came from Repobility (https://repobility.com)
check_curl_pipe function · python · L102-L108 (7 LOC).claude/hooks/block_dangerous_git.py
def check_curl_pipe(cmd: str) -> None:
# curl-pipe-sh patterns. Defang by searching the raw string.
lowered = cmd.lower()
if ("curl" in lowered or "wget" in lowered) and "|" in cmd:
for shell in ("sh", "bash", "zsh"):
if f"| {shell}" in lowered or f"|{shell}" in lowered or f"| /bin/{shell}" in lowered:
deny("piping curl/wget to a shell is forbidden (download, inspect, then run)")check_publish function · python · L111-L121 (11 LOC).claude/hooks/block_dangerous_git.py
def check_publish(tokens: list[str]) -> None:
if tokens[:2] == ["uv", "publish"]:
deny("uv publish is a human-only action")
if tokens[:2] == ["npm", "publish"]:
deny("npm publish is a human-only action")
if tokens[:2] == ["twine", "upload"]:
deny("twine upload is a human-only action")
if tokens[:2] == ["yarn", "publish"]:
deny("yarn publish is a human-only action")
if tokens[:2] == ["cargo", "publish"]:
deny("cargo publish is a human-only action")check_sudo function · python · L124-L128 (5 LOC).claude/hooks/block_dangerous_git.py
def check_sudo(tokens: list[str]) -> None:
if tokens and tokens[0] == "sudo":
deny("sudo is forbidden")
if tokens and tokens[0] == "doas":
deny("doas is forbidden")check_pip function · python · L131-L139 (9 LOC).claude/hooks/block_dangerous_git.py
def check_pip(tokens: list[str]) -> None:
# `pip install` bypasses uv's lockfile and can silently corrupt the venv.
# Everything in this repo goes through `uv add` / `uv sync` / `uv pip`.
if tokens[:2] == ["pip", "install"]:
deny("pip install is forbidden (use 'uv add' or 'uv sync --extra <name>')")
if tokens[:3] == ["python", "-m", "pip"] and len(tokens) >= 4 and tokens[3] == "install":
deny("python -m pip install is forbidden (use 'uv add' or 'uv sync')")
if tokens[:3] == ["python3", "-m", "pip"] and len(tokens) >= 4 and tokens[3] == "install":
deny("python3 -m pip install is forbidden (use 'uv add' or 'uv sync')")check_gh function · python · L142-L165 (24 LOC).claude/hooks/block_dangerous_git.py
def check_gh(tokens: list[str]) -> None:
# `gh` (GitHub CLI) can reach around branch protection and delete
# published state. Allow reads and PR/issue creation; block anything
# destructive or anything that bypasses PR review.
if len(tokens) < 2 or tokens[0] != "gh":
return
# Admin-merge bypasses branch protection. Hard no.
if tokens[:3] == ["gh", "pr", "merge"] and "--admin" in tokens:
deny("gh pr merge --admin bypasses branch protection (forbidden)")
# Release deletion is irreversible in practice.
if tokens[:3] == ["gh", "release", "delete"]:
deny("gh release delete is forbidden (releases are immutable by convention)")
# Auth logout would nuke the token the hook itself uses.
if tokens[:3] == ["gh", "auth", "logout"]:
deny("gh auth logout is forbidden (locks out CI + other scripts)")
# Repo-level destruction.
if tokens[:3] == ["gh", "repo", "delete"]:
deny("gh repo delete is forbidden (always acheck_dd function · python · L168-L176 (9 LOC).claude/hooks/block_dangerous_git.py
def check_dd(tokens: list[str]) -> None:
# `dd` has legitimate uses (generating random files, image conversion).
# The dangerous form is writing to a raw device (`of=/dev/...`) which
# can wipe disks. Block only that pattern; allow `if=/dev/urandom`.
if not tokens or tokens[0] != "dd":
return
for t in tokens[1:]:
if t.startswith("of=/dev/"):
deny(f"dd {t} writes to a raw device (would wipe disk)")check_destructive_system function · python · L179-L189 (11 LOC).claude/hooks/block_dangerous_git.py
def check_destructive_system(tokens: list[str]) -> None:
# Filesystem creation, shutdown, reboot, halt. No legitimate agent use.
if not tokens:
return
first = tokens[0]
if first.startswith("mkfs"):
deny(f"{first} creates a filesystem (forbidden)")
if first in {"shutdown", "reboot", "halt", "poweroff"}:
deny(f"{first} is forbidden")
if first == "init" and len(tokens) >= 2 and tokens[1] in {"0", "6"}:
deny("init 0/6 is forbidden (shutdown/reboot)")check_eval_source function · python · L192-L206 (15 LOC).claude/hooks/block_dangerous_git.py
def check_eval_source(tokens: list[str], cmd: str) -> None:
# `eval` runs arbitrary shell. `source /dev/stdin` / `. /dev/stdin`
# pipes arbitrary content into the current shell. Both defeat every
# other defense in the hook.
if not tokens:
return
if tokens[0] == "eval":
deny("eval is forbidden (arbitrary code execution)")
# `source /dev/stdin` or `. /dev/stdin`
if tokens[0] in {"source", "."} and len(tokens) >= 2:
if tokens[1] in {"/dev/stdin", "/dev/fd/0"}:
deny(f"{tokens[0]} {tokens[1]} is forbidden (arbitrary code from stdin)")
# Fork bomb pattern. Match the literal `:(){` token.
if ":(){" in cmd or ":() {" in cmd:
deny("fork bomb pattern detected")Want this analysis on your repo? https://repobility.com/scan/
main function · python · L209-L256 (48 LOC).claude/hooks/block_dangerous_git.py
def main() -> None:
try:
payload = json.loads(sys.stdin.read() or "{}")
except json.JSONDecodeError:
# Can't parse the hook payload → don't block; let Claude Code handle it.
sys.exit(0)
tool_input = payload.get("tool_input") or {}
cmd = tool_input.get("command")
if not isinstance(cmd, str) or not cmd.strip():
sys.exit(0)
# Always check the raw string for curl-pipe patterns; pipes break shlex tokenization.
check_curl_pipe(cmd)
# Tokenize. On parse failure, default-deny per the contract.
try:
tokens = shlex.split(cmd, comments=True, posix=True)
except ValueError as exc:
deny(f"unparseable command ({exc}); default-deny")
if not tokens:
sys.exit(0)
check_sudo(tokens)
check_publish(tokens)
check_pip(tokens)
check_gh(tokens)
check_dd(tokens)
check_destructive_system(tokens)
check_eval_source(tokens, cmd)
if tokens[0] == "rm":
check_rm(tokens)
main function · python · L21-L57 (37 LOC).claude/hooks/honor_loop_guard.py
def main() -> None:
try:
payload = json.loads(sys.stdin.read() or "{}")
except json.JSONDecodeError:
sys.exit(0)
if payload.get("tool_name") != "Bash":
sys.exit(0)
tool_response = payload.get("tool_response") or {}
# The exit code field name depends on Claude Code version; check both.
exit_code = tool_response.get("exit_code")
if exit_code is None:
exit_code = tool_response.get("code")
if exit_code is None:
# Some versions surface stderr but no exit code; try parsing stdout.
stdout = tool_response.get("stdout") or ""
if "exit 42" in stdout or "infinite-loop guard" in stdout:
exit_code = 42
if exit_code == 42:
cmd = (payload.get("tool_input") or {}).get("command") or "<unknown>"
msg = (
"\n"
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
"LOOP GUARD TRIPPED (exit 42)\n"
f" command: {cmd}\n"
" meaninshould_format function · python · L21-L24 (4 LOC).claude/hooks/ruff_format_on_save.py
def should_format(path_str: str) -> bool:
if not path_str.endswith(".py"):
return False
return any(path_str.startswith(root) or f"/{root}" in path_str for root in FORMAT_ROOTS)main function · python · L27-L72 (46 LOC).claude/hooks/ruff_format_on_save.py
def main() -> None:
try:
payload = json.loads(sys.stdin.read() or "{}")
except json.JSONDecodeError:
sys.exit(0)
tool_name = payload.get("tool_name") or ""
if tool_name not in {"Edit", "Write", "NotebookEdit"}:
sys.exit(0)
tool_input = payload.get("tool_input") or {}
file_path = tool_input.get("file_path") or tool_input.get("path")
if not isinstance(file_path, str):
sys.exit(0)
# Normalize to a repo-relative path for the should_format check.
try:
repo_root = Path(__file__).resolve().parents[2]
rel = Path(file_path).resolve().relative_to(repo_root)
except (ValueError, OSError):
sys.exit(0)
rel_str = str(rel)
if not should_format(rel_str):
sys.exit(0)
# Best-effort format + safe autofix. Swallow errors — we never block.
try:
subprocess.run(
["uv", "run", "ruff", "format", rel_str],
cwd=repo_root,
check=False,
cluster_name_variants function · python · L11-L59 (49 LOC)src/scrawl/anonymize/clustering.py
def cluster_name_variants(
names: list[str],
threshold: float = 0.85,
) -> list[list[str]]:
"""Cluster name strings that likely refer to the same person.
Uses Jaro-Winkler similarity with additional heuristics for
medical name patterns (Dr., M.D., NP, PT, etc.)
"""
# Deduplicate while preserving order
seen = set()
unique_names = []
for name in names:
normalized = name.strip()
if normalized.lower() not in seen:
seen.add(normalized.lower())
unique_names.append(normalized)
if not unique_names:
return []
# Sort longest first (most informative matches first)
unique_names.sort(key=len, reverse=True)
clusters: list[list[str]] = []
cluster_canonical: list[str] = [] # Normalized form for matching
for name in unique_names:
norm_name = _normalize_name(name)
best_cluster_idx = -1
best_score = 0.0
for idx, canonical in enumerate(cluster_canonical):
_normalize_name function · python · L62-L96 (35 LOC)src/scrawl/anonymize/clustering.py
def _normalize_name(name: str) -> str:
"""Strip titles, suffixes, and punctuation for comparison."""
prefixes = [
r"\bDr\.?\b",
r"\bMr\.?\b",
r"\bMs\.?\b",
r"\bMrs\.?\b",
r"\bNurse\s+Practitioner\b",
r"\bPhysical\s+Therapist\b",
]
suffixes = [
r",?\s*M\.?D\.?",
r",?\s*D\.?O\.?",
r",?\s*Ph\.?D\.?",
r",?\s*Psy\.?D\.?",
r",?\s*D\.?C\.?",
r",?\s*APN\b",
r",?\s*NP\b",
r",?\s*PA\b",
r",?\s*PT\b",
r",?\s*PTA\b",
r",?\s*OT\b",
r",?\s*Jr\.?\b",
r",?\s*Sr\.?\b",
r",?\s*III?\b",
]
result = name
for pattern in prefixes:
result = re.sub(pattern, "", result, flags=re.IGNORECASE)
for pattern in suffixes:
result = re.sub(pattern, "", result, flags=re.IGNORECASE)
result = re.sub(r"\s+", " ", result).strip().strip(",").strip()
return result.lower()_name_component_similarity function · python · L99-L127 (29 LOC)src/scrawl/anonymize/clustering.py
def _name_component_similarity(name1: str, name2: str) -> float:
"""Compare names by their component parts."""
parts1 = name1.split()
parts2 = name2.split()
if not parts1 or not parts2:
return 0.0
last1 = parts1[-1] if parts1 else ""
last2 = parts2[-1] if parts2 else ""
last_sim = fuzz.ratio(last1, last2) / 100.0
if last_sim < 0.8:
return last_sim * 0.5
if len(parts1) > 1 and len(parts2) > 1:
first1 = parts1[0]
first2 = parts2[0]
if len(first1) <= 2 or len(first2) <= 2:
short = first1 if len(first1) <= len(first2) else first2
long = first2 if len(first1) <= len(first2) else first1
if long.startswith(short.rstrip(".")):
return 0.95
first_sim = fuzz.ratio(first1, first2) / 100.0
return (last_sim * 0.6) + (first_sim * 0.4)
return last_sim * 0.8get_ssa_recognizers function · python · L10-L17 (8 LOC)src/scrawl/anonymize/custom_recognizers.py
def get_ssa_recognizers() -> list[PatternRecognizer]:
"""Return list of custom recognizers for SSA documents."""
return [
_ssn_recognizer(),
_medical_record_number_recognizer(),
_ssa_case_number_recognizer(),
_transcript_reference_recognizer(),
]About: code-quality intelligence by Repobility · https://repobility.com
_ssn_recognizer function · python · L20-L47 (28 LOC)src/scrawl/anonymize/custom_recognizers.py
def _ssn_recognizer() -> PatternRecognizer:
"""Enhanced SSN recognizer including partial SSNs.
Safe Harbor requires removing ALL SSN digits, including last 4.
Matches: 123-45-6789, 123 45 6789, XXX-XX-6789, ***-**-6789
"""
return PatternRecognizer(
supported_entity="US_SSN",
name="enhanced_ssn",
patterns=[
Pattern(
name="full_ssn",
regex=r"\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
score=0.85,
),
Pattern(
name="partial_ssn_last4",
regex=r"(?:[Xx*]{3}[-\s]?[Xx*]{2}[-\s]?)(\d{4})\b",
score=0.7,
),
Pattern(
name="ssn_label_nearby",
regex=r"(?:SSN|Social\s+Security(?:\s+(?:Number|No\.?|#))?)\s*:?\s*(\d{3}[-\s]?\d{2}[-\s]?\d{4})",
score=0.95,
),
],
supported_language="en",
)_medical_record_number_recognizer function · python · L50-L68 (19 LOC)src/scrawl/anonymize/custom_recognizers.py
def _medical_record_number_recognizer() -> PatternRecognizer:
"""Detect medical record numbers."""
return PatternRecognizer(
supported_entity="MEDICAL_RECORD",
name="mrn_recognizer",
patterns=[
Pattern(
name="mrn_labeled",
regex=r"(?:MRN|MR#|Medical\s+Record(?:\s+(?:Number|No\.?|#))?)[\s:]*(\d{5,12})",
score=0.9,
),
Pattern(
name="patient_id",
regex=r"(?:Patient\s+(?:ID|No\.?|#|Number))[\s:]*(\d{5,12})",
score=0.85,
),
],
supported_language="en",
)_ssa_case_number_recognizer function · python · L71-L89 (19 LOC)src/scrawl/anonymize/custom_recognizers.py
def _ssa_case_number_recognizer() -> PatternRecognizer:
"""Detect SSA case numbers (NOT anonymized — legal citations)."""
return PatternRecognizer(
supported_entity="LEGAL_CASE_NUMBER",
name="ssa_case_number",
patterns=[
Pattern(
name="federal_case",
regex=r"\b\d{1,2}:\d{2}-cv-\d{3,5}(?:-[A-Z]{2,4})?\b",
score=0.95,
),
Pattern(
name="appeal_number",
regex=r"\b\d{2}-\d{4,6}\b",
score=0.5,
),
],
supported_language="en",
)_transcript_reference_recognizer function · python · L92-L105 (14 LOC)src/scrawl/anonymize/custom_recognizers.py
def _transcript_reference_recognizer() -> PatternRecognizer:
"""Detect transcript references like (Tr. 120) or (Tr. 69-102)."""
return PatternRecognizer(
supported_entity="TRANSCRIPT_REF",
name="transcript_ref",
patterns=[
Pattern(
name="tr_ref",
regex=r"\(Tr\.\s*\d+(?:\s*[-–]\s*\d+)?\)",
score=0.95,
),
],
supported_language="en",
)_overlaps_pattern function · python · L158-L168 (11 LOC)src/scrawl/anonymize/engine.py
def _overlaps_pattern(pattern, text, start, end):
"""Check if entity span [start, end) overlaps any match of pattern."""
search_start = max(0, start - 15)
search_end = min(len(text), end + 15)
window = text[search_start:search_end]
for match in pattern.finditer(window):
match_abs_start = search_start + match.start()
match_abs_end = search_start + match.end()
if match_abs_start < end and match_abs_end > start:
return True
return Falsefilter_legal_false_positives function · python · L171-L230 (60 LOC)src/scrawl/anonymize/engine.py
def filter_legal_false_positives(results, text):
"""Remove Presidio results that match known legal text patterns.
Runs after analyzer.analyze() and before should_anonymize() to strip
false positives from statute citations, time durations, court names,
transcript references, and professional degrees.
Uses span overlap to avoid filtering real entities near legal patterns.
"""
filtered = []
for result in results:
entity_text = text[result.start : result.end]
if result.entity_type == "DATE_TIME":
if _overlaps_pattern(_STATUTE_RE, text, result.start, result.end):
continue
if _overlaps_pattern(_SECTION_RE, text, result.start, result.end):
continue
if _overlaps_pattern(_REPORTER_RE, text, result.start, result.end):
continue
if _overlaps_pattern(_TRANSCRIPT_RE, text, result.start, result.end):
continue
if _overlaps_patternAnonymizationResult class · python · L234-L240 (7 LOC)src/scrawl/anonymize/engine.py
class AnonymizationResult:
original_text: str
anonymized_text: str
entities_found: int
entities_anonymized: int
entities_preserved: int
mapping: dict[str, str]AnonymizationEngine class · python · L243-L367 (125 LOC)src/scrawl/anonymize/engine.py
class AnonymizationEngine:
"""Orchestrates HIPAA Safe Harbor de-identification for a case."""
def __init__(self, config):
self.config = config
self.analyzer: AnalyzerEngine | None = None
self.anonymizer: AnonymizerEngine | None = None
self.allowlist: list[AllowlistEntry] = []
self.entity_map: AnonymizationMap = AnonymizationMap()
def initialize(self):
"""Load NLP models and configure Presidio."""
nlp_config = {
"nlp_engine_name": "spacy",
"models": [{"lang_code": "en", "model_name": self.config.anonymize.spacy_model}],
}
nlp_engine = NlpEngineProvider(nlp_configuration=nlp_config).create_engine()
registry = RecognizerRegistry()
registry.load_predefined_recognizers(nlp_engine=nlp_engine)
for recognizer in get_ssa_recognizers():
registry.add_recognizer(recognizer)
self.analyzer = AnalyzerEngine(
nlp_engine=nlp_engine,
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
__init__ method · python · L246-L251 (6 LOC)src/scrawl/anonymize/engine.py
def __init__(self, config):
self.config = config
self.analyzer: AnalyzerEngine | None = None
self.anonymizer: AnonymizerEngine | None = None
self.allowlist: list[AllowlistEntry] = []
self.entity_map: AnonymizationMap = AnonymizationMap()initialize method · python · L253-L270 (18 LOC)src/scrawl/anonymize/engine.py
def initialize(self):
"""Load NLP models and configure Presidio."""
nlp_config = {
"nlp_engine_name": "spacy",
"models": [{"lang_code": "en", "model_name": self.config.anonymize.spacy_model}],
}
nlp_engine = NlpEngineProvider(nlp_configuration=nlp_config).create_engine()
registry = RecognizerRegistry()
registry.load_predefined_recognizers(nlp_engine=nlp_engine)
for recognizer in get_ssa_recognizers():
registry.add_recognizer(recognizer)
self.analyzer = AnalyzerEngine(
nlp_engine=nlp_engine,
registry=registry,
)
self.anonymizer = AnonymizerEngine()shutdown method · python · L272-L283 (12 LOC)src/scrawl/anonymize/engine.py
def shutdown(self):
"""Release NLP models from memory."""
self.analyzer = None
self.anonymizer = None
gc.collect()
try:
import torch
if torch.backends.mps.is_available():
torch.mps.empty_cache()
except ImportError:
passbuild_case_allowlist method · python · L285-L287 (3 LOC)src/scrawl/anonymize/engine.py
def build_case_allowlist(self, all_documents_text: str):
"""Build allowlist from all documents in a case."""
self.allowlist = build_dynamic_allowlist(all_documents_text)build_entity_map method · python · L289-L317 (29 LOC)src/scrawl/anonymize/engine.py
def build_entity_map(self, all_documents_text: str):
"""Pre-scan all documents to build consistent entity mapping."""
if self.analyzer is None:
raise RuntimeError("Call initialize() first")
results = self.analyzer.analyze(
text=all_documents_text,
entities=self.config.anonymize.presidio.entities,
language="en",
score_threshold=self.config.anonymize.presidio.score_threshold,
)
results = filter_legal_false_positives(results, all_documents_text)
person_entities = []
for result in results:
entity_text = all_documents_text[result.start : result.end]
if result.entity_type == "PERSON":
if should_anonymize(entity_text, result.entity_type, self.allowlist):
person_entities.append(entity_text)
clusters = cluster_name_variants(
person_entities,
threshold=self.config.anonymize.fuzzy_matcanonymize method · python · L319-L367 (49 LOC)src/scrawl/anonymize/engine.py
def anonymize(self, text: str) -> AnonymizationResult:
"""Anonymize a single document's text."""
if self.analyzer is None:
raise RuntimeError("Call initialize() first")
results = self.analyzer.analyze(
text=text,
entities=self.config.anonymize.presidio.entities,
language="en",
score_threshold=self.config.anonymize.presidio.score_threshold,
)
results = filter_legal_false_positives(results, text)
entities_found = len(results)
entities_preserved = 0
entities_anonymized = 0
filtered_results = []
for result in results:
entity_text = text[result.start : result.end]
if should_anonymize(entity_text, result.entity_type, self.allowlist):
filtered_results.append(result)
entities_anonymized += 1
else:
entities_preserved += 1
# Build operator configs using consisAnonymizationMap class · python · L11-L152 (142 LOC)src/scrawl/anonymize/mapping.py
class AnonymizationMap:
"""Manages consistent pseudonym mappings for a case."""
def __init__(self):
self._person_map: dict[str, str] = {}
self._entity_counters: dict[str, int] = {}
def build_from_clusters(self, clusters: list[list[str]], format_config):
"""Assign pseudonyms to name clusters."""
person_id = 1
provider_id = 1
for cluster in clusters:
is_provider = any(
indicator in name.lower()
for name in cluster
for indicator in [
"dr.",
"dr ",
"m.d.",
"d.o.",
"ph.d.",
"psy.d.",
"apn",
" np",
" pa",
" pt",
" pta",
" ot",
"d.c.",
"nurse",
"therapist",
]
__init__ method · python · L14-L16 (3 LOC)src/scrawl/anonymize/mapping.py
def __init__(self):
self._person_map: dict[str, str] = {}
self._entity_counters: dict[str, int] = {}If a scraper extracted this row, it came from Repobility (https://repobility.com)
build_from_clusters method · python · L18-L54 (37 LOC)src/scrawl/anonymize/mapping.py
def build_from_clusters(self, clusters: list[list[str]], format_config):
"""Assign pseudonyms to name clusters."""
person_id = 1
provider_id = 1
for cluster in clusters:
is_provider = any(
indicator in name.lower()
for name in cluster
for indicator in [
"dr.",
"dr ",
"m.d.",
"d.o.",
"ph.d.",
"psy.d.",
"apn",
" np",
" pa",
" pt",
" pta",
" ot",
"d.c.",
"nurse",
"therapist",
]
)
if is_provider:
pseudonym = format_config.provider.format(provider_id)
provider_id += 1
else:
pseudonym = format_config.peget_pseudonym method · python · L56-L95 (40 LOC)src/scrawl/anonymize/mapping.py
def get_pseudonym(self, entity_text: str, entity_type: str) -> str:
"""Look up or create a pseudonym for an entity."""
key = entity_text.lower().strip()
if entity_type == "PERSON":
if key in self._person_map:
return self._person_map[key]
from rapidfuzz import fuzz
best_match = None
best_score = 0
for stored_key, pseudonym in self._person_map.items():
score = fuzz.partial_ratio(key, stored_key) / 100.0
if score > best_score and score >= 0.80:
best_score = score
best_match = pseudonym
if best_match:
self._person_map[key] = best_match
return best_match
next_id = len(set(v for v in self._person_map.values() if v.startswith("[PERSON"))) + 1
pseudonym = f"[PERSON-{next_id:03d}]"
self._person_map[key] = pseudonym
return pseudonget_mapping_dict method · python · L97-L99 (3 LOC)src/scrawl/anonymize/mapping.py
def get_mapping_dict(self) -> dict[str, str]:
"""Return full mapping dictionary."""
return dict(self._person_map)save_encrypted method · python · L101-L131 (31 LOC)src/scrawl/anonymize/mapping.py
def save_encrypted(self, db_path: Path, case_id: str):
"""Save mapping to encrypted SQLite database."""
key = os.environ.get("SCRAWL_MAP_KEY")
if not key:
raise EnvironmentError(
"SCRAWL_MAP_KEY environment variable not set. "
"Generate with: python -c "
"'from cryptography.fernet import Fernet; "
"print(Fernet.generate_key().decode())'"
)
f = Fernet(key.encode())
mapping_json = json.dumps(self._person_map)
encrypted = f.encrypt(mapping_json.encode())
conn = sqlite3.connect(str(db_path))
conn.execute("""
CREATE TABLE IF NOT EXISTS mappings (
case_id TEXT PRIMARY KEY,
encrypted_data BLOB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
entity_count INTEGER
)
""")
conn.execute(
"INSERT OR REPLACE INTO mappings "
load_encrypted method · python · L133-L152 (20 LOC)src/scrawl/anonymize/mapping.py
def load_encrypted(self, db_path: Path, case_id: str):
"""Load mapping from encrypted SQLite database."""
key = os.environ.get("SCRAWL_MAP_KEY")
if not key:
raise EnvironmentError("SCRAWL_MAP_KEY environment variable not set.")
f = Fernet(key.encode())
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT encrypted_data FROM mappings WHERE case_id = ?",
(case_id,),
).fetchone()
conn.close()
if row is None:
raise KeyError(f"No mapping found for case_id: {case_id}")
decrypted = f.decrypt(row[0])
self._person_map = json.loads(decrypted)AllowlistEntry class · python · L8-L11 (4 LOC)src/scrawl/anonymize/selective.py
class AllowlistEntry:
name: str
category: str # "judge" | "case_law" | "commissioner" | "legal_citation"
source: str # "static" | "extracted"build_dynamic_allowlist function · python · L55-L88 (34 LOC)src/scrawl/anonymize/selective.py
def build_dynamic_allowlist(full_text: str) -> list[AllowlistEntry]:
"""Extract names from document text that should be preserved."""
entries = []
for match in CASE_LAW_PATTERN.finditer(full_text):
for group_idx in [1, 2]:
name = match.group(group_idx).strip().rstrip(".")
if len(name) > 2:
entries.append(AllowlistEntry(name=name, category="case_law", source="extracted"))
for match in JUDGE_PATTERN.finditer(full_text):
name = match.group(1).strip().rstrip(",.")
entries.append(AllowlistEntry(name=name, category="judge", source="extracted"))
for match in COMMISSIONER_PATTERN.finditer(full_text):
name = match.group(1).strip().rstrip(",.")
entries.append(AllowlistEntry(name=name, category="commissioner", source="extracted"))
for match in CIRCUIT_JUDGES_PATTERN.finditer(full_text):
names_str = match.group(1)
names = re.split(r"[,\s]+(?:and\s+)?", names_str)
for should_anonymize function · python · L91-L112 (22 LOC)src/scrawl/anonymize/selective.py
def should_anonymize(
entity_text: str,
entity_type: str,
allowlist: list[AllowlistEntry],
fuzzy_threshold: float = 0.85,
) -> bool:
"""Determine whether a detected entity should be anonymized."""
from rapidfuzz import fuzz
entity_clean = entity_text.strip().lower()
for entry in allowlist:
entry_clean = entry.name.strip().lower()
if entity_clean in entry_clean or entry_clean in entity_clean:
return False
similarity = fuzz.partial_ratio(entity_clean, entry_clean) / 100.0
if similarity >= fuzzy_threshold:
return False
return TrueWant this analysis on your repo? https://repobility.com/scan/
DocumentInfo class · python · L18-L25 (8 LOC)src/scrawl/assemble/markdown.py
class DocumentInfo:
"""Metadata for a single source document in the case."""
filename: str
doc_type: str
filing_date: date | None
page_count: int
source_path: Path | Noneassemble_case_markdown function · python · L41-L92 (52 LOC)src/scrawl/assemble/markdown.py
def assemble_case_markdown(
case_id: str,
documents: list[DocumentInfo],
page_results: dict[str, list],
config,
) -> str:
"""Assemble all processed pages into a single Markdown file."""
sections = []
# 1. YAML Frontmatter
if config.assemble.include_frontmatter:
sections.append(_build_frontmatter(case_id, documents))
# 2. Table of Contents
if config.assemble.include_toc:
sections.append(_build_toc(documents, config))
# 3. Group documents by type, then chronological within each type
ordered_docs = _order_documents(documents, config)
# 4. Render each document
current_type = None
for doc in ordered_docs:
if doc.doc_type != current_type:
current_type = doc.doc_type
label = DOC_TYPE_LABELS.get(current_type, current_type)
sections.append(f"\n# {label}\n")
date_str = doc.filing_date.isoformat() if doc.filing_date else "unknown"
sections.append(f"\n## {doc_build_frontmatter function · python · L95-L105 (11 LOC)src/scrawl/assemble/markdown.py
def _build_frontmatter(case_id: str, documents: list[DocumentInfo]) -> str:
"""Build YAML frontmatter block."""
doc_list = "\n".join(f' - "{d.filename}"' for d in documents)
return f"""---
case_id: "{case_id}"
processed_date: "{date.today().isoformat()}"
pipeline_version: "1.0"
document_count: {len(documents)}
documents:
{doc_list}
---"""page 1 / 3next ›