Function bodies 108 total
find_checkpoints_dir function · python · L35-L45 (11 LOC)hooks/checkpoint_guard.py
def find_checkpoints_dir() -> Path | None:
"""Find .claude/checkpoints/ walking up from CWD."""
# WHY: check for existing checkpoints dir first, then fall back to .claude dir
result = find_file_upward(str(Path(".claude") / "checkpoints"))
if result is not None:
return result
# If .claude exists but no checkpoints dir, return the expected path
claude_dir = find_file_upward(".claude")
if claude_dir is not None and claude_dir.is_dir():
return claude_dir / "checkpoints"
return Nonelatest_checkpoint_age function · python · L48-L58 (11 LOC)hooks/checkpoint_guard.py
def latest_checkpoint_age(checkpoints_dir: Path) -> float | None:
"""Return age in minutes of the newest checkpoint, or None if no checkpoints."""
if not checkpoints_dir.exists():
return None
md_files = list(checkpoints_dir.glob("*.md"))
if not md_files:
return None
newest = max(f.stat().st_mtime for f in md_files)
return (time.time() - newest) / 60main function · python · L61-L89 (29 LOC)hooks/checkpoint_guard.py
def main():
data = parse_stdin()
if not data:
return
tool_input = get_tool_input(data)
command = tool_input.get("command", "")
# Check if command matches risky patterns
is_risky = any(pattern in command for pattern in RISKY_PATTERNS)
if not is_risky:
return
checkpoints_dir = find_checkpoints_dir()
if checkpoints_dir is None:
return
age = latest_checkpoint_age(checkpoints_dir)
if age is None or age > 60:
freshness = "no checkpoints found" if age is None else f"latest is {age:.0f} min old"
emit_hook_result(
"PostToolUse",
f"[checkpoint-guard] Risky operation detected: {command[:80]}... "
f"Checkpoint status: {freshness}. "
"SUGGESTION: Create a checkpoint before proceeding — "
"save state to .claude/checkpoints/<date>_<description>.md "
"(branch, key files, current task, rollback steps).",
)extract_not_now_keywords function · python · L19-L54 (36 LOC)hooks/drift_guard.py
def extract_not_now_keywords(not_now: str) -> list[str]:
"""Extract meaningful keywords from NOT NOW field.
Splits by commas and common delimiters, lowercases, strips filler words.
"""
if not not_now:
return []
# WHY: NOT NOW often contains phrases like "don't optimize config,
# don't analyze competitors" — we extract the action nouns
filler = {
"не",
"don't",
"do",
"not",
"не делать",
"avoid",
"skip",
"no",
"нет",
"без",
"the",
"a",
"an",
"this",
"that",
}
raw_parts = not_now.replace(";", ",").replace("•", ",").replace("—", ",").split(",")
keywords = []
for part in raw_parts:
words = part.lower().strip().split()
meaningful = [w for w in words if w not in filler and len(w) > 2]
keywords.extend(meaningful)
return keywordscheck_drift function · python · L57-L96 (40 LOC)hooks/drift_guard.py
def check_drift(tool_name: str, tool_input: dict, not_now_keywords: list[str]) -> str | None:
"""Check if the current tool call drifts into NOT NOW territory.
Returns warning message or None.
"""
if not not_now_keywords:
return None
# Build a search string from tool metadata
search_parts = [tool_name.lower()]
# Extract skill/agent name from tool_input
for key in ("skill", "name", "description", "prompt", "subagent_type"):
val = tool_input.get(key, "")
if val:
search_parts.append(str(val).lower())
search_text = " ".join(search_parts)
# Check each NOT NOW keyword against the search text
# WHY: "deployment" should match "deploy", so we check both directions
# and also stem-like prefix matching (shared prefix >= 4 chars)
matched = []
search_words = search_text.split()
for kw in not_now_keywords:
if kw in search_text:
matched.append(kw)
elif any(kw.startswith(w[:4main function · python · L99-L128 (30 LOC)hooks/drift_guard.py
def main() -> None:
data = parse_stdin()
if not data:
return
tool_name = data.get("tool_name", "")
tool_input = get_tool_input(data)
# Find and parse Scope Fence
ctx_path = find_scope_fence()
if ctx_path is None:
return
try:
content = ctx_path.read_text(encoding="utf-8")
except OSError:
return
fence = parse_scope_fence(content)
not_now = fence.get("not_now", "")
# Skip if no Scope Fence or NOT NOW is empty/placeholder
if not not_now or not_now.startswith("{{"):
return
keywords = extract_not_now_keywords(not_now)
warning = check_drift(tool_name, tool_input, keywords)
if warning:
emit_hook_result("PostToolUse", warning)collect_strings function · python · L57-L71 (15 LOC)hooks/input_guard.py
def collect_strings(value: Any) -> list[str]:
"""Рекурсивно собирает все строковые значения из произвольной структуры данных."""
if isinstance(value, str):
return [value]
if isinstance(value, dict):
results: list[str] = []
for v in value.values():
results.extend(collect_strings(v))
return results
if isinstance(value, list):
results = []
for item in value:
results.extend(collect_strings(item))
return results
return []Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
sanitize function · python · L74-L82 (9 LOC)hooks/input_guard.py
def sanitize(value: Any) -> Any:
"""Рекурсивно удаляет null-байты и zero-width символы из строк."""
if isinstance(value, str):
return SANITIZE_PATTERN.sub("", value)
if isinstance(value, dict):
return {k: sanitize(v) for k, v in value.items()}
if isinstance(value, list):
return [sanitize(item) for item in value]
return valuescan function · python · L85-L93 (9 LOC)hooks/input_guard.py
def scan(strings: list[str]) -> dict[str, int]:
"""Возвращает словарь {категория: количество_совпадений} для всех строк."""
hits: dict[str, int] = {}
for text in strings:
for category, pattern in PATTERNS.items():
count = len(pattern.findall(text))
if count:
hits[category] = hits.get(category, 0) + count
return hitsmain function · python · L96-L138 (43 LOC)hooks/input_guard.py
def main() -> None:
# WHY: intentionally NOT using parse_stdin() from utils — different semantics.
# parse_stdin() returns {} on failure (fail-silent), but this security hook
# must sys.exit(0) on parse failure (fail-open: allow the call to proceed).
try:
data = json.load(sys.stdin)
except (json.JSONDecodeError, EOFError):
sys.exit(0)
tool_name: str = data.get("tool_name", "")
# ПОЧЕМУ: проверяем только MCP-инструменты — они принимают внешние данные,
# встроенные инструменты Claude (Read, Bash и т.д.) доверенные по определению.
if not tool_name.startswith("mcp__"):
sys.exit(0)
tool_input: Any = data.get("tool_input", {})
strings = collect_strings(tool_input)
hits = scan(strings)
if not hits:
# NONE — разрешить, вернуть sanitized input
clean_input = sanitize(tool_input)
print(json.dumps({"tool_input": clean_input}))
sys.exit(0)
categories = list(hits.keys())
total_matmain function · python · L39-L73 (35 LOC)hooks/mcp_circuit_breaker_post.py
def main() -> None:
event = parse_stdin_raw()
if not event:
return
tool_name: str = event.get("tool_name", "")
server = get_mcp_server_name(tool_name)
if server is None:
return
tool_result: str = str(event.get("tool_result", ""))
state = load_json_state(STATE_FILE)
entry = state.get(server, {})
if is_error(tool_result):
# Инкремент failures, при пороге — фиксируем opened_at
failures = entry.get("failures", 0) + 1
entry["failures"] = failures
if failures >= FAILURE_THRESHOLD and "opened_at" not in entry:
entry["opened_at"] = time.time()
print(
f"[circuit-breaker] {server}: OPEN after {failures} failures",
file=sys.stderr,
)
else:
# Success — полный сброс (восстановление из HALF_OPEN)
if entry.get("failures", 0) > 0:
print(
f"[circuit-breaker] {server}: recovered, resetting",
get_circuit_status function · python · L39-L52 (14 LOC)hooks/mcp_circuit_breaker.py
def get_circuit_status(entry: dict) -> str:
"""Определяет текущее состояние цепи для конкретного сервера."""
failures = entry.get("failures", 0)
opened_at = entry.get("opened_at")
if failures < FAILURE_THRESHOLD:
return "CLOSED"
if opened_at and (time.time() - opened_at) >= RECOVERY_TIMEOUT:
# ПОЧЕМУ: HALF_OPEN позволяет одному запросу пройти для проверки
# восстановления сервера без полного сброса счётчика
return "HALF_OPEN"
return "OPEN"record_open function · python · L55-L62 (8 LOC)hooks/mcp_circuit_breaker.py
def record_open(state: dict, server: str) -> dict:
"""Переводит цепь сервера в состояние OPEN, фиксируя время блокировки."""
entry = state.get(server, {})
entry["failures"] = entry.get("failures", 0) + 1
if entry["failures"] >= FAILURE_THRESHOLD and "opened_at" not in entry:
entry["opened_at"] = time.time()
state[server] = entry
return statemain function · python · L68-L106 (39 LOC)hooks/mcp_circuit_breaker.py
def main() -> None:
"""Обрабатывает PreToolUse-событие от Claude Code."""
event = parse_stdin_raw()
if not event:
# Не можем разобрать ввод — пропускаем без блокировки
print("{}")
return
tool_name: str = event.get("tool_name", "")
server = get_mcp_server_name(tool_name)
if server is None:
# Не MCP-инструмент — circuit breaker не применяется
print("{}")
return
state = load_json_state(STATE_FILE)
entry = state.get(server, {})
status = get_circuit_status(entry)
if status == "OPEN":
fallback = FALLBACKS.get(server, DEFAULT_FALLBACK)
result = {
"decision": "block",
"reason": f"Circuit OPEN for '{server}' ({entry.get('failures', 0)} failures). "
f"Fallback: {fallback}",
}
print(json.dumps(result))
return
if status == "HALF_OPEN":
# ПОЧЕМУ: сбрасываем opened_at чтобы дать один шанс — если снова
# упадёт, Prmain function · python · L23-L46 (24 LOC)hooks/mcp_locality_guard.py
def main():
data = parse_stdin()
if not data:
sys.exit(0)
tool_name = data.get("tool_name", "")
if not tool_name.startswith("mcp__"):
sys.exit(0)
# Check if this MCP is exempt
mcp_prefix = "__".join(tool_name.split("__")[:2])
if mcp_prefix in EXEMPT_MCPS:
sys.exit(0)
print(
f"[mcp-locality] Before using {tool_name}: "
f"did you try local Read/Grep/Glob first? "
f"Local search costs 0 tokens and 0 latency. "
f"Use MCP only if local search didn't find the answer.",
file=sys.stderr,
)
sys.exit(0)About: code-quality intelligence by Repobility · https://repobility.com
main function · python · L27-L62 (36 LOC)hooks/memory_guard.py
def main():
data = parse_stdin()
if not data:
return
tool_input = get_tool_input(data)
command = tool_input.get("command", "")
if "git commit" not in command:
return
response_text = extract_tool_response(data)
if is_failed_commit(response_text):
return
# Find project memory
active_ctx = find_project_memory()
if active_ctx is None:
emit_hook_result(
"PostToolUse",
"[memory-guard] git commit detected but no .claude/memory/activeContext.md found. "
"Consider creating one to track project state.",
)
return
# Check staleness: if activeContext was modified more than 5 min ago, it's stale
mtime = active_ctx.stat().st_mtime
age_minutes = (time.time() - mtime) / 60
if age_minutes > 5:
emit_hook_result(
"PostToolUse",
f"[memory-guard] git commit detected. activeContext.md is {age_minutes:.0f} min old. "
"UPDAextract_fix_subject function · python · L43-L55 (13 LOC)hooks/pattern_extractor.py
def extract_fix_subject(commit_msg: str) -> str | None:
"""Извлекает краткое описание из fix:-коммита.
Поддерживает форматы:
- "fix: something broken"
- "fix(scope): something broken"
Returns None если коммит не fix:.
"""
# ПОЧЕМУ: re.match c IGNORECASE — коммиты пишут по-разному (Fix:, FIX:, fix:)
m = re.match(r"^fix(?:\([^)]+\))?:\s*(.+)", commit_msg, re.IGNORECASE)
if m:
return m.group(1).strip()
return Noneload_patterns_text function · python · L58-L66 (9 LOC)hooks/pattern_extractor.py
def load_patterns_text() -> str:
"""Читает содержимое patterns.md. Возвращает пустую строку если файл не найден.
WHY: try/except вместо exists()+read — избегаем TOCTOU race condition.
"""
try:
return GLOBAL_PATTERNS_PATH.read_text(encoding="utf-8")
except (FileNotFoundError, OSError):
return ""find_matching_patterns function · python · L69-L118 (50 LOC)hooks/pattern_extractor.py
def find_matching_patterns(subject: str, patterns_text: str) -> list[tuple[str, int]]:
"""Ищет существующие паттерны в секции 'Отладка и фиксы', чьи заголовки
содержат ключевые слова из subject коммита.
Returns список (заголовок_паттерна, текущий_счётчик).
ПОЧЕМУ: простой keyword-overlap без NLP — достаточно для 80% случаев.
Сложные семантические матчи избыточны для hook-уровня.
"""
if not patterns_text:
return []
# Выделяем только секцию "Отладка и фиксы" чтобы не ловить ложные совпадения
# из других секций (например, архитектурных паттернов)
section_start = patterns_text.find(TARGET_SECTION)
if section_start == -1:
return []
# Ищем конец секции (следующий ## заголовок на уровне 2)
offset = section_start + len(TARGET_SECTION)
next_section = re.search(r"\n## ", patterns_text[offset:])
if next_section:
section_end = offset + next_section.start()
section_text = patterns_text[section_start:secti_extract_counter function · python · L121-L141 (21 LOC)hooks/pattern_extractor.py
def _extract_counter(header_line: str, section_text: str, header_pos: int) -> int:
"""Извлекает числовой счётчик из заголовка паттерна или его первых строк.
ПОЧЕМУ: счётчик может быть в заголовке ### [2026-01-01] Название [×3]
или в отдельной строке ниже. Проверяем оба места.
"""
# Сначала ищем в самой строке заголовка
m = re.search(r"\[×(\d+)\]", header_line)
if m:
return int(m.group(1))
# Ищем в блоке паттерна (от заголовка до следующего ###)
tail = section_text[header_pos:]
# WHY: skip 4 chars ("### ") to avoid matching current header's "###" prefix
block_end = re.search(r"\n###", tail[4:])
block = tail[: block_end.start() + 4] if block_end else tail
m = re.search(r"\[×(\d+)\]", block)
if m:
return int(m.group(1))
return 1 # первое вхождение, ещё не размеченоsanitize_commit_msg function · python · L144-L150 (7 LOC)hooks/pattern_extractor.py
def sanitize_commit_msg(msg: str) -> str:
"""Strip newlines and limit length to prevent prompt injection.
WHY: commit messages are attacker-controlled input that flows into
additionalContext (seen by LLM). Newlines could break JSON or inject prompts.
"""
return sanitize_text(msg, MAX_COMMIT_MSG_LEN)build_reminder_message function · python · L153-L196 (44 LOC)hooks/pattern_extractor.py
def build_reminder_message(
commit_hash: str,
commit_msg: str,
subject: str,
matching: list[tuple[str, int]],
) -> str:
"""Формирует текст напоминания для Claude в additionalContext."""
safe_msg = sanitize_commit_msg(commit_msg)
today = date.today().isoformat()
lines: list[str] = [
f"[pattern-extractor] fix:-коммит обнаружен: `{commit_hash}` — «{safe_msg}»",
"",
"Пожалуйста, извлеки паттерн и добавь его в ~/.claude/memory/patterns.md",
f"под секцию «{TARGET_SECTION}».",
"",
]
if matching:
lines.append("ВНИМАНИЕ: найдены похожие существующие паттерны:")
for header, counter in matching:
lines.append(f" • {header} [×{counter}]")
lines.append(
f" → Если это тот же баг — увеличь счётчик: [×{counter}] → [×{counter + 1}]"
)
lines.append(" вместо создания нового блока.")
lines.append("")
lines.append("Если баг main function · python · L199-L231 (33 LOC)hooks/pattern_extractor.py
def main() -> None:
data = parse_stdin()
if not data:
return
tool_input = get_tool_input(data)
command = tool_input.get("command", "")
if "git commit" not in command:
return
response_text = extract_tool_response(data)
if is_failed_commit(response_text):
return
commit_hash = run_git(["log", "-1", "--format=%h"])
commit_msg = run_git(["log", "-1", "--format=%s"])
if not commit_hash:
return
# Активируемся только на fix:-коммиты
subject = extract_fix_subject(commit_msg)
if subject is None:
return
# Ищем совпадения с существующими паттернами
patterns_text = load_patterns_text()
matching = find_matching_patterns(subject, patterns_text)
reminder = build_reminder_message(commit_hash, commit_msg, subject, matching)
emit_hook_result("PostToolUse", reminder)Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_tracker_path function · python · L22-L27 (6 LOC)hooks/plan_mode_guard.py
def get_tracker_path(session_id: str) -> Path:
"""Get path for session-specific file tracker."""
# ПОЧЕМУ: используем session_id, а не PID — PID может переиспользоваться,
# session_id уникален для каждой сессии Claude Code
safe_id = session_id.replace("/", "_").replace("\\", "_")[:32]
return Path(tempfile.gettempdir()) / f"claude_plan_guard_{safe_id}.txt"has_active_plan function · python · L30-L47 (18 LOC)hooks/plan_mode_guard.py
def has_active_plan() -> bool:
"""Check if any plan file exists in .claude/plans/ directory.
ПОЧЕМУ: если план уже существует (одобрен или в процессе), предупреждение
о 3+ файлах без плана — ложноположительное. Проверяем наличие .md файлов
в plans/ как маркер того что план был создан.
"""
plans_dir = Path.home() / ".claude" / "plans"
if not plans_dir.exists():
return False
# Любой .md файл, модифицированный за последние 24 часа = активный план
import time
now = time.time()
for f in plans_dir.glob("*.md"):
if now - f.stat().st_mtime < 86400: # 24 hours
return True
return Falsemain function · python · L50-L115 (66 LOC)hooks/plan_mode_guard.py
def main() -> None:
data = parse_stdin()
if not data:
return
# ПОЧЕМУ: поддерживаем оба формата — вложенный tool_input и плоский (legacy).
# post_format.py читает file_path с root, но документация говорит о вложенности.
tool_input = get_tool_input(data)
file_path = tool_input.get("file_path", "")
if not file_path:
return
# Session tracking
session_id = data.get("session_id", "unknown")
tracker = get_tracker_path(session_id)
# Read existing paths
existing_paths: set[str] = set()
if tracker.exists():
existing_paths = set(tracker.read_text(encoding="utf-8").strip().split("\n"))
existing_paths.discard("")
# Add new path (normalize)
normalized = str(Path(file_path).resolve())
existing_paths.add(normalized)
# Write back
tracker.write_text("\n".join(existing_paths) + "\n", encoding="utf-8")
count = len(existing_paths)
# ПОЧЕМУ: если план существует — не предупреждать, агент extract_decision function · python · L39-L60 (22 LOC)hooks/post_commit_memory.py
def extract_decision(commit_msg: str) -> tuple[str, str] | None:
"""Extract decision type and description from commit message.
Returns (type, description) if commit message starts with a decision prefix.
"""
msg_lower = commit_msg.lower()
for prefix in DECISION_PREFIXES:
if msg_lower.startswith(prefix):
description = commit_msg[len(prefix) :].strip()
# Strip conventional commit prefix if present (e.g., "feat: arch: ...")
decision_type = prefix.rstrip(":")
return decision_type, description
# Also check after conventional commit prefix: "feat: arch: ..."
for conv in ("feat:", "fix:", "refactor:", "chore:", "docs:"):
combined = f"{conv} {prefix}"
if msg_lower.startswith(combined):
description = commit_msg[len(combined) :].strip()
decision_type = prefix.rstrip(":")
return decision_type, description
return Nonelog_decision function · python · L63-L83 (21 LOC)hooks/post_commit_memory.py
def log_decision(commit_hash: str, commit_msg: str) -> str | None:
"""Auto-record decision to decisions.md if commit message has decision prefix."""
result = extract_decision(commit_msg)
if result is None:
return None
decision_type, description = result
decisions_file = find_decisions_file()
if decisions_file is None:
return f"Decision detected but no decisions.md found: [{decision_type}] {description}"
now = datetime.now().strftime("%Y-%m-%d")
# Format: ### [date] Description. Type: X. Commit: hash
entry = f"\n### [{now}] {description}\n- Тип: {decision_type}\n- Коммит: `{commit_hash}`\n"
content = decisions_file.read_text(encoding="utf-8")
# Append at the end
content = content.rstrip() + "\n" + entry
decisions_file.write_text(content, encoding="utf-8")
return f"Auto-recorded [{decision_type}] decision to decisions.md"main function · python · L86-L156 (71 LOC)hooks/post_commit_memory.py
def main() -> None:
data = parse_stdin()
if not data:
return
tool_input = get_tool_input(data)
command = tool_input.get("command", "")
if "git commit" not in command:
return
response_text = extract_tool_response(data)
if is_failed_commit(response_text):
return
# Получаем данные последнего коммита
commit_hash = run_git(["log", "-1", "--format=%h"])
commit_msg = run_git(["log", "-1", "--format=%s"])
if not commit_hash:
return
# Находим activeContext.md
active_ctx = find_project_memory()
if active_ctx is None:
emit_hook_result(
"PostToolUse",
"[post-commit-memory] Commit logged but no activeContext.md found. "
"Consider creating .claude/memory/activeContext.md for project state tracking.",
)
return
# ПОЧЕМУ: дописываем в конец файла, не перезаписываем.
# Секция "Auto-commit log" — структурированный лог, легко парсить.
now = main function · python · L17-L41 (25 LOC)hooks/post_format.py
def main():
data = parse_stdin()
if not data:
return
# ПОЧЕМУ: поддерживаем оба формата — вложенный tool_input и плоский
tool_input = get_tool_input(data)
path = tool_input.get("file_path", "")
if not path or not os.path.exists(path):
return
ext = os.path.splitext(path)[1].lower()
if ext == ".py":
# ПОЧЕМУ: ruff format — 10-100x быстрее black, drop-in replacement
subprocess.run(
["ruff", "format", "--line-length", "100", "--quiet", path],
capture_output=True,
)
elif ext in (".js", ".ts", ".jsx", ".tsx"):
subprocess.run(
["prettier", "--write", "--log-level", "silent", path],
capture_output=True,
)main function · python · L19-L114 (96 LOC)hooks/pre_commit_guard.py
def main() -> None:
data = parse_stdin()
if not data:
return
tool_input = get_tool_input(data)
command = tool_input.get("command", "")
# --- Check 0: Block direct push to public repo ---
# WHY: Public repo (Claude-cod-top-2026) is read-only distribution.
# Changes go to origin (private) first, then PR to public.
# WHY: check only the actual command, not heredoc/string content inside it
# Allow pushing feature branches to public (needed for PRs), block only main/master
first_line = command.split("\n")[0].strip()
if (
first_line.startswith("git push")
and "public" in first_line
and ("main" in first_line or "master" in first_line)
):
print(
"[pre-commit-guard] BLOCKED: Direct push to 'public' remote is not allowed. "
"Push to 'origin' first, then create a PR to the public repo.",
file=sys.stderr,
)
sys.exit(2)
# WHY: fast exit if not git commitRepobility (the analyzer behind this table) · https://repobility.com
main function · python · L15-L37 (23 LOC)hooks/pre_compact.py
def main():
# Update project activeContext.md
active = find_project_memory()
if active is not None:
content = active.read_text(encoding="utf-8")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
# Update the "Updated:" line
lines = content.split("\n")
for i, line in enumerate(lines):
if line.startswith("## Updated:"):
lines[i] = f"## Updated: {timestamp} (pre-compact)"
break
active.write_text("\n".join(lines), encoding="utf-8")
print(f"[PreCompact] Updated {active} timestamp to {timestamp}")
else:
print("[PreCompact] No project activeContext.md found.")
# Log compaction event
log_dir = os.path.expanduser("~/.claude/logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "sessions.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{datetime.now().isoformat()} | COMPACT | cwd={os.getcwd()}\n")main function · python · L16-L41 (26 LOC)hooks/read_before_edit.py
def main():
data = parse_stdin()
if not data:
sys.exit(0)
tool_name = data.get("tool_name", "")
tool_input = data.get("tool_input", {})
if tool_name not in ("Edit", "Write"):
sys.exit(0)
file_path = tool_input.get("file_path", "")
if not file_path:
sys.exit(0)
# Write to new files is always OK (no existing content to read)
# For Edit, always remind
if tool_name == "Edit":
print(
f"[read-before-edit] Editing {file_path}. "
f"Confirm: did you Read this file first? "
f"If not, Read it before editing — [MEMORY] does not replace [VERIFIED].",
file=sys.stderr,
)
sys.exit(0)get_last_commit_time function · python · L17-L27 (11 LOC)hooks/session_save.py
def get_last_commit_time() -> float | None:
"""Get timestamp of the last git commit."""
try:
result = subprocess.run(
["git", "log", "-1", "--format=%ct"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return float(result.stdout.strip())
except Exception:
pass
return Nonemain function · python · L30-L80 (51 LOC)hooks/session_save.py
def main():
try:
# 1. Update global activeContext timestamp
global_path = os.path.expanduser("~/.claude/memory/activeContext.md")
if os.path.exists(global_path):
with open(global_path, encoding="utf-8") as f:
content = f.read()
lines = content.split("\n")
for i, line in enumerate(lines):
if "## Последнее обновление" in line and i + 1 < len(lines):
lines[i + 1] = datetime.now().strftime("%Y-%m-%d %H:%M")
break
with open(global_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
# 2. Log session
log_dir = os.path.expanduser("~/.claude/logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "sessions.log")
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{datetime.now().isoformat()} | SESSION_END\n")
# 3. Check project memory stauto_update_config_repo function · python · L20-L51 (32 LOC)hooks/session_start.py
def auto_update_config_repo():
"""If config was installed with --link, git pull the source repo.
Detection: ~/.claude/.claude-code-config-repo contains repo path.
"""
marker = Path.home() / ".claude" / CONFIG_REPO_MARKER
if not marker.exists():
return
repo_path = marker.read_text(encoding="utf-8").strip()
if not repo_path or not Path(repo_path).is_dir():
return
try:
result = subprocess.run(
["git", "-C", repo_path, "pull", "--ff-only"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
output = result.stdout.strip()
if output and "Already up to date" not in output:
print(f"[SessionStart] Config updated: {output}")
else:
# Log to stderr (not visible to Claude, but useful for debugging)
print(
f"[SessionStart] Config auto-update skipped: {result.stderr.stripprint_scope_fence function · python · L54-L81 (28 LOC)hooks/session_start.py
def print_scope_fence():
"""Print Scope Fence status at session start."""
fence_source = find_scope_fence()
if fence_source is None:
print(
"\n[SessionStart] No Scope Fence found. "
"Create .scope-fence.md in project root or add ## Scope Fence to activeContext.md."
)
return
# ПОЧЕМУ: parse_scope_fence из utils инкапсулирует парсинг — не дублируем логику здесь.
fence = parse_scope_fence(fence_source.read_text(encoding="utf-8"))
fence_goal = fence.get("goal", "")
fence_not_now = fence.get("not_now", "")
if fence_goal and not fence_goal.startswith("{{"):
source_label = fence_source.name
print(f"\n[SessionStart] Scope Fence active (from {source_label}).")
print(f" Goal: {fence_goal}")
if fence_not_now and not fence_not_now.startswith("{{"):
print(f" NOT NOW: {fence_not_now}")
else:
print(
"\n[SessionStart] No Scope Fence found. "
main function · python · L84-L120 (37 LOC)hooks/session_start.py
def main():
# Auto-update config repo if installed with --link
auto_update_config_repo()
mem_dir = find_project_claude_dir()
# Output project memory if available
if mem_dir is not None:
active = mem_dir / "activeContext.md"
if active.exists():
print(f"=== PROJECT ACTIVE CONTEXT ({active}) ===")
print(active.read_text(encoding="utf-8"))
print("=== END ACTIVE CONTEXT ===\n")
decisions = mem_dir / "decisions.md"
if decisions.exists():
content = decisions.read_text(encoding="utf-8")
if len(content) > 2000:
content = "...(truncated)...\n" + content[-2000:]
print(f"=== PROJECT DECISIONS ({decisions}) ===")
print(content)
print("=== END DECISIONS ===\n")
print(f"[SessionStart] Loaded project memory from {mem_dir.parent}")
else:
print("[SessionStart] No project .claude/memory/ found in path hierarchy.")
parse_stdin function · python · L14-L25 (12 LOC)hooks/utils.py
def parse_stdin() -> dict:
"""Parse JSON from stdin (Claude Code hook protocol).
Returns empty dict on parse failure — hooks should exit gracefully.
WHY: Every hook does this identically. Centralizing prevents
inconsistent error handling (some used EOFError, some didn't).
"""
try:
result = json.load(sys.stdin)
return result if isinstance(result, dict) else {}
except (json.JSONDecodeError, EOFError, ValueError):
return {}Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
parse_stdin_raw function · python · L28-L39 (12 LOC)hooks/utils.py
def parse_stdin_raw() -> dict:
"""Parse JSON from stdin using read() instead of load().
WHY: mcp_circuit_breaker uses sys.stdin.read() explicitly.
Some hooks need this variant for compatibility.
"""
try:
raw = sys.stdin.read()
result = json.loads(raw)
return result if isinstance(result, dict) else {}
except (json.JSONDecodeError, ValueError):
return {}get_tool_input function · python · L42-L49 (8 LOC)hooks/utils.py
def get_tool_input(data: dict) -> dict:
"""Extract tool_input from hook data, supporting both nested and flat formats.
WHY: Claude Code sends tool_input as a nested dict, but some older
hook protocols use flat format. This handles both consistently.
"""
tool_input = data.get("tool_input", data)
return tool_input if isinstance(tool_input, dict) else datarun_git function · python · L52-L67 (16 LOC)hooks/utils.py
def run_git(args: list[str], timeout: int = 10) -> str:
"""Run git command and return stdout.
WHY: Duplicated identically in pre_commit_guard, post_commit_memory,
pattern_extractor (3 copies, 36 lines total).
"""
try:
result = subprocess.run(
["git", *args],
capture_output=True,
text=True,
timeout=timeout,
)
return result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
return ""find_project_memory function · python · L70-L81 (12 LOC)hooks/utils.py
def find_project_memory() -> Path | None:
"""Find activeContext.md walking up from CWD.
WHY: Duplicated in memory_guard, checkpoint_guard, post_commit_memory,
session_save, pre_compact (5 copies with slight variations).
"""
cwd = Path.cwd()
for parent in [cwd, *cwd.parents]:
candidate = parent / ".claude" / "memory" / "activeContext.md"
if candidate.exists():
return candidate
return Nonefind_project_claude_dir function · python · L84-L99 (16 LOC)hooks/utils.py
def find_project_claude_dir() -> Path | None:
"""Find .claude/memory/ directory walking up from CWD.
WHY: session_start.py variant — returns directory, not file.
Also checks for CLAUDE.md as project root marker.
"""
cwd = Path.cwd()
for parent in [cwd, *cwd.parents]:
candidate = parent / ".claude" / "memory" / "activeContext.md"
if candidate.exists():
return parent / ".claude" / "memory"
if (parent / "CLAUDE.md").exists():
claude_mem = parent / ".claude" / "memory"
if claude_mem.exists():
return claude_mem
return Nonefind_scope_fence function · python · L102-L123 (22 LOC)hooks/utils.py
def find_scope_fence() -> Path | None:
"""Find Scope Fence file, searching multiple tool-agnostic locations.
Search order (first found wins):
1. .scope-fence.md at project root (universal)
2. .claude/memory/activeContext.md (Claude Code)
3. .cursor/memory_bank/activeContext.md (Cursor)
WHY: Duplicated in drift_guard.py and session_start.py (identical logic).
"""
cwd = Path.cwd()
candidates = [
".scope-fence.md",
str(Path(".claude") / "memory" / "activeContext.md"),
str(Path(".cursor") / "memory_bank" / "activeContext.md"),
]
for parent in [cwd, *cwd.parents]:
for rel in candidates:
full = parent / rel
if full.exists():
return full
return Noneparse_scope_fence function · python · L126-L154 (29 LOC)hooks/utils.py
def parse_scope_fence(content: str) -> dict[str, str]:
"""Extract Scope Fence fields from file content.
Returns dict with keys: goal, boundary, done_when, not_now.
WHY: Used by both drift_guard and session_start.
"""
fence: dict[str, str] = {}
in_fence = False
for line in content.splitlines():
stripped = line.strip()
if stripped == "## Scope Fence":
in_fence = True
continue
if in_fence and stripped.startswith("## "):
break
if not in_fence:
continue
if stripped.startswith("Goal:"):
fence["goal"] = stripped[5:].strip()
elif stripped.startswith("Boundary:"):
fence["boundary"] = stripped[9:].strip()
elif stripped.startswith("Done when:"):
fence["done_when"] = stripped[10:].strip()
elif stripped.startswith("NOT NOW:"):
fence["not_now"] = stripped[8:].strip()
return fencefind_file_upward function · python · L157-L168 (12 LOC)hooks/utils.py
def find_file_upward(relative_path: str) -> Path | None:
"""Find a file by walking up the directory tree from CWD.
WHY: Generic version of find_project_memory/find_checkpoints_dir.
Reduces the need for one-off search functions.
"""
cwd = Path.cwd()
for parent in [cwd, *cwd.parents]:
full = parent / relative_path
if full.exists():
return full
return NoneAbout: code-quality intelligence by Repobility · https://repobility.com
get_mcp_server_name function · python · L171-L179 (9 LOC)hooks/utils.py
def get_mcp_server_name(tool_name: str) -> str | None:
"""Extract MCP server name from tool name (mcp__<server>__<method>).
WHY: Duplicated in mcp_circuit_breaker.py and mcp_circuit_breaker_post.py.
"""
parts = tool_name.split("__")
if len(parts) >= 3 and parts[0] == "mcp":
return parts[1]
return Noneload_json_state function · python · L182-L194 (13 LOC)hooks/utils.py
def load_json_state(path: Path) -> dict:
"""Load JSON state file, returning empty dict on any error.
WHY: Duplicated in mcp_circuit_breaker and mcp_circuit_breaker_post
(identical load_state functions).
"""
if not path.exists():
return {}
try:
result = json.loads(path.read_text(encoding="utf-8"))
return result if isinstance(result, dict) else {}
except (json.JSONDecodeError, OSError):
return {}save_json_state function · python · L197-L203 (7 LOC)hooks/utils.py
def save_json_state(path: Path, state: dict) -> None:
"""Save dict as JSON state file, creating parent dirs.
WHY: Duplicated in mcp_circuit_breaker and mcp_circuit_breaker_post.
"""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(state, indent=2), encoding="utf-8")page 1 / 3next ›