← back to sergeeey__Claude-cod-top-2026

Function bodies 108 total

All specs Real LLM only Function bodies
find_checkpoints_dir function · python · L35-L45 (11 LOC)
hooks/checkpoint_guard.py
def find_checkpoints_dir() -> Path | None:
    """Find .claude/checkpoints/ walking up from CWD."""
    # WHY: check for existing checkpoints dir first, then fall back to .claude dir
    result = find_file_upward(str(Path(".claude") / "checkpoints"))
    if result is not None:
        return result
    # If .claude exists but no checkpoints dir, return the expected path
    claude_dir = find_file_upward(".claude")
    if claude_dir is not None and claude_dir.is_dir():
        return claude_dir / "checkpoints"
    return None
latest_checkpoint_age function · python · L48-L58 (11 LOC)
hooks/checkpoint_guard.py
def latest_checkpoint_age(checkpoints_dir: Path) -> float | None:
    """Return age in minutes of the newest checkpoint, or None if no checkpoints."""
    if not checkpoints_dir.exists():
        return None

    md_files = list(checkpoints_dir.glob("*.md"))
    if not md_files:
        return None

    newest = max(f.stat().st_mtime for f in md_files)
    return (time.time() - newest) / 60
main function · python · L61-L89 (29 LOC)
hooks/checkpoint_guard.py
def main():
    data = parse_stdin()
    if not data:
        return

    tool_input = get_tool_input(data)
    command = tool_input.get("command", "")

    # Check if command matches risky patterns
    is_risky = any(pattern in command for pattern in RISKY_PATTERNS)
    if not is_risky:
        return

    checkpoints_dir = find_checkpoints_dir()
    if checkpoints_dir is None:
        return

    age = latest_checkpoint_age(checkpoints_dir)

    if age is None or age > 60:
        freshness = "no checkpoints found" if age is None else f"latest is {age:.0f} min old"
        emit_hook_result(
            "PostToolUse",
            f"[checkpoint-guard] Risky operation detected: {command[:80]}... "
            f"Checkpoint status: {freshness}. "
            "SUGGESTION: Create a checkpoint before proceeding — "
            "save state to .claude/checkpoints/<date>_<description>.md "
            "(branch, key files, current task, rollback steps).",
        )
extract_not_now_keywords function · python · L19-L54 (36 LOC)
hooks/drift_guard.py
def extract_not_now_keywords(not_now: str) -> list[str]:
    """Extract meaningful keywords from NOT NOW field.

    Splits by commas and common delimiters, lowercases, strips filler words.
    """
    if not not_now:
        return []

    # WHY: NOT NOW often contains phrases like "don't optimize config,
    # don't analyze competitors" — we extract the action nouns
    filler = {
        "не",
        "don't",
        "do",
        "not",
        "не делать",
        "avoid",
        "skip",
        "no",
        "нет",
        "без",
        "the",
        "a",
        "an",
        "this",
        "that",
    }

    raw_parts = not_now.replace(";", ",").replace("•", ",").replace("—", ",").split(",")
    keywords = []
    for part in raw_parts:
        words = part.lower().strip().split()
        meaningful = [w for w in words if w not in filler and len(w) > 2]
        keywords.extend(meaningful)

    return keywords
check_drift function · python · L57-L96 (40 LOC)
hooks/drift_guard.py
def check_drift(tool_name: str, tool_input: dict, not_now_keywords: list[str]) -> str | None:
    """Check if the current tool call drifts into NOT NOW territory.

    Returns warning message or None.
    """
    if not not_now_keywords:
        return None

    # Build a search string from tool metadata
    search_parts = [tool_name.lower()]

    # Extract skill/agent name from tool_input
    for key in ("skill", "name", "description", "prompt", "subagent_type"):
        val = tool_input.get(key, "")
        if val:
            search_parts.append(str(val).lower())

    search_text = " ".join(search_parts)

    # Check each NOT NOW keyword against the search text
    # WHY: "deployment" should match "deploy", so we check both directions
    # and also stem-like prefix matching (shared prefix >= 4 chars)
    matched = []
    search_words = search_text.split()
    for kw in not_now_keywords:
        if kw in search_text:
            matched.append(kw)
        elif any(kw.startswith(w[:4
main function · python · L99-L128 (30 LOC)
hooks/drift_guard.py
def main() -> None:
    data = parse_stdin()
    if not data:
        return

    tool_name = data.get("tool_name", "")
    tool_input = get_tool_input(data)

    # Find and parse Scope Fence
    ctx_path = find_scope_fence()
    if ctx_path is None:
        return

    try:
        content = ctx_path.read_text(encoding="utf-8")
    except OSError:
        return

    fence = parse_scope_fence(content)
    not_now = fence.get("not_now", "")

    # Skip if no Scope Fence or NOT NOW is empty/placeholder
    if not not_now or not_now.startswith("{{"):
        return

    keywords = extract_not_now_keywords(not_now)
    warning = check_drift(tool_name, tool_input, keywords)

    if warning:
        emit_hook_result("PostToolUse", warning)
collect_strings function · python · L57-L71 (15 LOC)
hooks/input_guard.py
def collect_strings(value: Any) -> list[str]:
    """Рекурсивно собирает все строковые значения из произвольной структуры данных."""
    if isinstance(value, str):
        return [value]
    if isinstance(value, dict):
        results: list[str] = []
        for v in value.values():
            results.extend(collect_strings(v))
        return results
    if isinstance(value, list):
        results = []
        for item in value:
            results.extend(collect_strings(item))
        return results
    return []
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
sanitize function · python · L74-L82 (9 LOC)
hooks/input_guard.py
def sanitize(value: Any) -> Any:
    """Рекурсивно удаляет null-байты и zero-width символы из строк."""
    if isinstance(value, str):
        return SANITIZE_PATTERN.sub("", value)
    if isinstance(value, dict):
        return {k: sanitize(v) for k, v in value.items()}
    if isinstance(value, list):
        return [sanitize(item) for item in value]
    return value
scan function · python · L85-L93 (9 LOC)
hooks/input_guard.py
def scan(strings: list[str]) -> dict[str, int]:
    """Возвращает словарь {категория: количество_совпадений} для всех строк."""
    hits: dict[str, int] = {}
    for text in strings:
        for category, pattern in PATTERNS.items():
            count = len(pattern.findall(text))
            if count:
                hits[category] = hits.get(category, 0) + count
    return hits
main function · python · L96-L138 (43 LOC)
hooks/input_guard.py
def main() -> None:
    # WHY: intentionally NOT using parse_stdin() from utils — different semantics.
    # parse_stdin() returns {} on failure (fail-silent), but this security hook
    # must sys.exit(0) on parse failure (fail-open: allow the call to proceed).
    try:
        data = json.load(sys.stdin)
    except (json.JSONDecodeError, EOFError):
        sys.exit(0)

    tool_name: str = data.get("tool_name", "")

    # ПОЧЕМУ: проверяем только MCP-инструменты — они принимают внешние данные,
    # встроенные инструменты Claude (Read, Bash и т.д.) доверенные по определению.
    if not tool_name.startswith("mcp__"):
        sys.exit(0)

    tool_input: Any = data.get("tool_input", {})
    strings = collect_strings(tool_input)
    hits = scan(strings)

    if not hits:
        # NONE — разрешить, вернуть sanitized input
        clean_input = sanitize(tool_input)
        print(json.dumps({"tool_input": clean_input}))
        sys.exit(0)

    categories = list(hits.keys())
    total_mat
main function · python · L39-L73 (35 LOC)
hooks/mcp_circuit_breaker_post.py
def main() -> None:
    event = parse_stdin_raw()
    if not event:
        return

    tool_name: str = event.get("tool_name", "")
    server = get_mcp_server_name(tool_name)
    if server is None:
        return

    tool_result: str = str(event.get("tool_result", ""))
    state = load_json_state(STATE_FILE)
    entry = state.get(server, {})

    if is_error(tool_result):
        # Инкремент failures, при пороге — фиксируем opened_at
        failures = entry.get("failures", 0) + 1
        entry["failures"] = failures
        if failures >= FAILURE_THRESHOLD and "opened_at" not in entry:
            entry["opened_at"] = time.time()
            print(
                f"[circuit-breaker] {server}: OPEN after {failures} failures",
                file=sys.stderr,
            )
    else:
        # Success — полный сброс (восстановление из HALF_OPEN)
        if entry.get("failures", 0) > 0:
            print(
                f"[circuit-breaker] {server}: recovered, resetting",
            
get_circuit_status function · python · L39-L52 (14 LOC)
hooks/mcp_circuit_breaker.py
def get_circuit_status(entry: dict) -> str:
    """Определяет текущее состояние цепи для конкретного сервера."""
    failures = entry.get("failures", 0)
    opened_at = entry.get("opened_at")

    if failures < FAILURE_THRESHOLD:
        return "CLOSED"

    if opened_at and (time.time() - opened_at) >= RECOVERY_TIMEOUT:
        # ПОЧЕМУ: HALF_OPEN позволяет одному запросу пройти для проверки
        # восстановления сервера без полного сброса счётчика
        return "HALF_OPEN"

    return "OPEN"
record_open function · python · L55-L62 (8 LOC)
hooks/mcp_circuit_breaker.py
def record_open(state: dict, server: str) -> dict:
    """Переводит цепь сервера в состояние OPEN, фиксируя время блокировки."""
    entry = state.get(server, {})
    entry["failures"] = entry.get("failures", 0) + 1
    if entry["failures"] >= FAILURE_THRESHOLD and "opened_at" not in entry:
        entry["opened_at"] = time.time()
    state[server] = entry
    return state
main function · python · L68-L106 (39 LOC)
hooks/mcp_circuit_breaker.py
def main() -> None:
    """Обрабатывает PreToolUse-событие от Claude Code."""
    event = parse_stdin_raw()
    if not event:
        # Не можем разобрать ввод — пропускаем без блокировки
        print("{}")
        return

    tool_name: str = event.get("tool_name", "")
    server = get_mcp_server_name(tool_name)

    if server is None:
        # Не MCP-инструмент — circuit breaker не применяется
        print("{}")
        return

    state = load_json_state(STATE_FILE)
    entry = state.get(server, {})
    status = get_circuit_status(entry)

    if status == "OPEN":
        fallback = FALLBACKS.get(server, DEFAULT_FALLBACK)
        result = {
            "decision": "block",
            "reason": f"Circuit OPEN for '{server}' ({entry.get('failures', 0)} failures). "
            f"Fallback: {fallback}",
        }
        print(json.dumps(result))
        return

    if status == "HALF_OPEN":
        # ПОЧЕМУ: сбрасываем opened_at чтобы дать один шанс — если снова
        # упадёт, Pr
main function · python · L23-L46 (24 LOC)
hooks/mcp_locality_guard.py
def main():
    data = parse_stdin()
    if not data:
        sys.exit(0)

    tool_name = data.get("tool_name", "")

    if not tool_name.startswith("mcp__"):
        sys.exit(0)

    # Check if this MCP is exempt
    mcp_prefix = "__".join(tool_name.split("__")[:2])
    if mcp_prefix in EXEMPT_MCPS:
        sys.exit(0)

    print(
        f"[mcp-locality] Before using {tool_name}: "
        f"did you try local Read/Grep/Glob first? "
        f"Local search costs 0 tokens and 0 latency. "
        f"Use MCP only if local search didn't find the answer.",
        file=sys.stderr,
    )

    sys.exit(0)
About: code-quality intelligence by Repobility · https://repobility.com
main function · python · L27-L62 (36 LOC)
hooks/memory_guard.py
def main():
    data = parse_stdin()
    if not data:
        return

    tool_input = get_tool_input(data)
    command = tool_input.get("command", "")

    if "git commit" not in command:
        return

    response_text = extract_tool_response(data)
    if is_failed_commit(response_text):
        return

    # Find project memory
    active_ctx = find_project_memory()
    if active_ctx is None:
        emit_hook_result(
            "PostToolUse",
            "[memory-guard] git commit detected but no .claude/memory/activeContext.md found. "
            "Consider creating one to track project state.",
        )
        return

    # Check staleness: if activeContext was modified more than 5 min ago, it's stale
    mtime = active_ctx.stat().st_mtime
    age_minutes = (time.time() - mtime) / 60

    if age_minutes > 5:
        emit_hook_result(
            "PostToolUse",
            f"[memory-guard] git commit detected. activeContext.md is {age_minutes:.0f} min old. "
            "UPDA
extract_fix_subject function · python · L43-L55 (13 LOC)
hooks/pattern_extractor.py
def extract_fix_subject(commit_msg: str) -> str | None:
    """Извлекает краткое описание из fix:-коммита.

    Поддерживает форматы:
    - "fix: something broken"
    - "fix(scope): something broken"
    Returns None если коммит не fix:.
    """
    # ПОЧЕМУ: re.match c IGNORECASE — коммиты пишут по-разному (Fix:, FIX:, fix:)
    m = re.match(r"^fix(?:\([^)]+\))?:\s*(.+)", commit_msg, re.IGNORECASE)
    if m:
        return m.group(1).strip()
    return None
load_patterns_text function · python · L58-L66 (9 LOC)
hooks/pattern_extractor.py
def load_patterns_text() -> str:
    """Читает содержимое patterns.md. Возвращает пустую строку если файл не найден.

    WHY: try/except вместо exists()+read — избегаем TOCTOU race condition.
    """
    try:
        return GLOBAL_PATTERNS_PATH.read_text(encoding="utf-8")
    except (FileNotFoundError, OSError):
        return ""
find_matching_patterns function · python · L69-L118 (50 LOC)
hooks/pattern_extractor.py
def find_matching_patterns(subject: str, patterns_text: str) -> list[tuple[str, int]]:
    """Ищет существующие паттерны в секции 'Отладка и фиксы', чьи заголовки
    содержат ключевые слова из subject коммита.

    Returns список (заголовок_паттерна, текущий_счётчик).
    ПОЧЕМУ: простой keyword-overlap без NLP — достаточно для 80% случаев.
    Сложные семантические матчи избыточны для hook-уровня.
    """
    if not patterns_text:
        return []

    # Выделяем только секцию "Отладка и фиксы" чтобы не ловить ложные совпадения
    # из других секций (например, архитектурных паттернов)
    section_start = patterns_text.find(TARGET_SECTION)
    if section_start == -1:
        return []

    # Ищем конец секции (следующий ## заголовок на уровне 2)
    offset = section_start + len(TARGET_SECTION)
    next_section = re.search(r"\n## ", patterns_text[offset:])
    if next_section:
        section_end = offset + next_section.start()
        section_text = patterns_text[section_start:secti
_extract_counter function · python · L121-L141 (21 LOC)
hooks/pattern_extractor.py
def _extract_counter(header_line: str, section_text: str, header_pos: int) -> int:
    """Извлекает числовой счётчик из заголовка паттерна или его первых строк.

    ПОЧЕМУ: счётчик может быть в заголовке ### [2026-01-01] Название [×3]
    или в отдельной строке ниже. Проверяем оба места.
    """
    # Сначала ищем в самой строке заголовка
    m = re.search(r"\[×(\d+)\]", header_line)
    if m:
        return int(m.group(1))

    # Ищем в блоке паттерна (от заголовка до следующего ###)
    tail = section_text[header_pos:]
    # WHY: skip 4 chars ("### ") to avoid matching current header's "###" prefix
    block_end = re.search(r"\n###", tail[4:])
    block = tail[: block_end.start() + 4] if block_end else tail
    m = re.search(r"\[×(\d+)\]", block)
    if m:
        return int(m.group(1))

    return 1  # первое вхождение, ещё не размечено
sanitize_commit_msg function · python · L144-L150 (7 LOC)
hooks/pattern_extractor.py
def sanitize_commit_msg(msg: str) -> str:
    """Strip newlines and limit length to prevent prompt injection.

    WHY: commit messages are attacker-controlled input that flows into
    additionalContext (seen by LLM). Newlines could break JSON or inject prompts.
    """
    return sanitize_text(msg, MAX_COMMIT_MSG_LEN)
build_reminder_message function · python · L153-L196 (44 LOC)
hooks/pattern_extractor.py
def build_reminder_message(
    commit_hash: str,
    commit_msg: str,
    subject: str,
    matching: list[tuple[str, int]],
) -> str:
    """Формирует текст напоминания для Claude в additionalContext."""
    safe_msg = sanitize_commit_msg(commit_msg)
    today = date.today().isoformat()
    lines: list[str] = [
        f"[pattern-extractor] fix:-коммит обнаружен: `{commit_hash}` — «{safe_msg}»",
        "",
        "Пожалуйста, извлеки паттерн и добавь его в ~/.claude/memory/patterns.md",
        f"под секцию «{TARGET_SECTION}».",
        "",
    ]

    if matching:
        lines.append("ВНИМАНИЕ: найдены похожие существующие паттерны:")
        for header, counter in matching:
            lines.append(f"  • {header} [×{counter}]")
            lines.append(
                f"    → Если это тот же баг — увеличь счётчик: [×{counter}] → [×{counter + 1}]"
            )
            lines.append("      вместо создания нового блока.")
        lines.append("")
        lines.append("Если баг 
main function · python · L199-L231 (33 LOC)
hooks/pattern_extractor.py
def main() -> None:
    data = parse_stdin()
    if not data:
        return

    tool_input = get_tool_input(data)
    command = tool_input.get("command", "")

    if "git commit" not in command:
        return

    response_text = extract_tool_response(data)
    if is_failed_commit(response_text):
        return

    commit_hash = run_git(["log", "-1", "--format=%h"])
    commit_msg = run_git(["log", "-1", "--format=%s"])

    if not commit_hash:
        return

    # Активируемся только на fix:-коммиты
    subject = extract_fix_subject(commit_msg)
    if subject is None:
        return

    # Ищем совпадения с существующими паттернами
    patterns_text = load_patterns_text()
    matching = find_matching_patterns(subject, patterns_text)

    reminder = build_reminder_message(commit_hash, commit_msg, subject, matching)

    emit_hook_result("PostToolUse", reminder)
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_tracker_path function · python · L22-L27 (6 LOC)
hooks/plan_mode_guard.py
def get_tracker_path(session_id: str) -> Path:
    """Get path for session-specific file tracker."""
    # ПОЧЕМУ: используем session_id, а не PID — PID может переиспользоваться,
    # session_id уникален для каждой сессии Claude Code
    safe_id = session_id.replace("/", "_").replace("\\", "_")[:32]
    return Path(tempfile.gettempdir()) / f"claude_plan_guard_{safe_id}.txt"
has_active_plan function · python · L30-L47 (18 LOC)
hooks/plan_mode_guard.py
def has_active_plan() -> bool:
    """Check if any plan file exists in .claude/plans/ directory.

    ПОЧЕМУ: если план уже существует (одобрен или в процессе), предупреждение
    о 3+ файлах без плана — ложноположительное. Проверяем наличие .md файлов
    в plans/ как маркер того что план был создан.
    """
    plans_dir = Path.home() / ".claude" / "plans"
    if not plans_dir.exists():
        return False
    # Любой .md файл, модифицированный за последние 24 часа = активный план
    import time

    now = time.time()
    for f in plans_dir.glob("*.md"):
        if now - f.stat().st_mtime < 86400:  # 24 hours
            return True
    return False
main function · python · L50-L115 (66 LOC)
hooks/plan_mode_guard.py
def main() -> None:
    data = parse_stdin()
    if not data:
        return

    # ПОЧЕМУ: поддерживаем оба формата — вложенный tool_input и плоский (legacy).
    # post_format.py читает file_path с root, но документация говорит о вложенности.
    tool_input = get_tool_input(data)
    file_path = tool_input.get("file_path", "")

    if not file_path:
        return

    # Session tracking
    session_id = data.get("session_id", "unknown")
    tracker = get_tracker_path(session_id)

    # Read existing paths
    existing_paths: set[str] = set()
    if tracker.exists():
        existing_paths = set(tracker.read_text(encoding="utf-8").strip().split("\n"))
        existing_paths.discard("")

    # Add new path (normalize)
    normalized = str(Path(file_path).resolve())
    existing_paths.add(normalized)

    # Write back
    tracker.write_text("\n".join(existing_paths) + "\n", encoding="utf-8")

    count = len(existing_paths)

    # ПОЧЕМУ: если план существует — не предупреждать, агент 
extract_decision function · python · L39-L60 (22 LOC)
hooks/post_commit_memory.py
def extract_decision(commit_msg: str) -> tuple[str, str] | None:
    """Extract decision type and description from commit message.

    Returns (type, description) if commit message starts with a decision prefix.
    """
    msg_lower = commit_msg.lower()
    for prefix in DECISION_PREFIXES:
        if msg_lower.startswith(prefix):
            description = commit_msg[len(prefix) :].strip()
            # Strip conventional commit prefix if present (e.g., "feat: arch: ...")
            decision_type = prefix.rstrip(":")
            return decision_type, description

        # Also check after conventional commit prefix: "feat: arch: ..."
        for conv in ("feat:", "fix:", "refactor:", "chore:", "docs:"):
            combined = f"{conv} {prefix}"
            if msg_lower.startswith(combined):
                description = commit_msg[len(combined) :].strip()
                decision_type = prefix.rstrip(":")
                return decision_type, description

    return None
log_decision function · python · L63-L83 (21 LOC)
hooks/post_commit_memory.py
def log_decision(commit_hash: str, commit_msg: str) -> str | None:
    """Auto-record decision to decisions.md if commit message has decision prefix."""
    result = extract_decision(commit_msg)
    if result is None:
        return None

    decision_type, description = result
    decisions_file = find_decisions_file()
    if decisions_file is None:
        return f"Decision detected but no decisions.md found: [{decision_type}] {description}"

    now = datetime.now().strftime("%Y-%m-%d")
    # Format: ### [date] Description. Type: X. Commit: hash
    entry = f"\n### [{now}] {description}\n- Тип: {decision_type}\n- Коммит: `{commit_hash}`\n"

    content = decisions_file.read_text(encoding="utf-8")
    # Append at the end
    content = content.rstrip() + "\n" + entry
    decisions_file.write_text(content, encoding="utf-8")

    return f"Auto-recorded [{decision_type}] decision to decisions.md"
main function · python · L86-L156 (71 LOC)
hooks/post_commit_memory.py
def main() -> None:
    data = parse_stdin()
    if not data:
        return

    tool_input = get_tool_input(data)
    command = tool_input.get("command", "")

    if "git commit" not in command:
        return

    response_text = extract_tool_response(data)
    if is_failed_commit(response_text):
        return

    # Получаем данные последнего коммита
    commit_hash = run_git(["log", "-1", "--format=%h"])
    commit_msg = run_git(["log", "-1", "--format=%s"])

    if not commit_hash:
        return

    # Находим activeContext.md
    active_ctx = find_project_memory()
    if active_ctx is None:
        emit_hook_result(
            "PostToolUse",
            "[post-commit-memory] Commit logged but no activeContext.md found. "
            "Consider creating .claude/memory/activeContext.md for project state tracking.",
        )
        return

    # ПОЧЕМУ: дописываем в конец файла, не перезаписываем.
    # Секция "Auto-commit log" — структурированный лог, легко парсить.
    now = 
main function · python · L17-L41 (25 LOC)
hooks/post_format.py
def main():
    data = parse_stdin()
    if not data:
        return

    # ПОЧЕМУ: поддерживаем оба формата — вложенный tool_input и плоский
    tool_input = get_tool_input(data)
    path = tool_input.get("file_path", "")

    if not path or not os.path.exists(path):
        return

    ext = os.path.splitext(path)[1].lower()

    if ext == ".py":
        # ПОЧЕМУ: ruff format — 10-100x быстрее black, drop-in replacement
        subprocess.run(
            ["ruff", "format", "--line-length", "100", "--quiet", path],
            capture_output=True,
        )
    elif ext in (".js", ".ts", ".jsx", ".tsx"):
        subprocess.run(
            ["prettier", "--write", "--log-level", "silent", path],
            capture_output=True,
        )
main function · python · L19-L114 (96 LOC)
hooks/pre_commit_guard.py
def main() -> None:
    data = parse_stdin()
    if not data:
        return

    tool_input = get_tool_input(data)
    command = tool_input.get("command", "")

    # --- Check 0: Block direct push to public repo ---
    # WHY: Public repo (Claude-cod-top-2026) is read-only distribution.
    # Changes go to origin (private) first, then PR to public.
    # WHY: check only the actual command, not heredoc/string content inside it
    # Allow pushing feature branches to public (needed for PRs), block only main/master
    first_line = command.split("\n")[0].strip()
    if (
        first_line.startswith("git push")
        and "public" in first_line
        and ("main" in first_line or "master" in first_line)
    ):
        print(
            "[pre-commit-guard] BLOCKED: Direct push to 'public' remote is not allowed. "
            "Push to 'origin' first, then create a PR to the public repo.",
            file=sys.stderr,
        )
        sys.exit(2)

    # WHY: fast exit if not git commit
Repobility (the analyzer behind this table) · https://repobility.com
main function · python · L15-L37 (23 LOC)
hooks/pre_compact.py
def main():
    # Update project activeContext.md
    active = find_project_memory()
    if active is not None:
        content = active.read_text(encoding="utf-8")
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
        # Update the "Updated:" line
        lines = content.split("\n")
        for i, line in enumerate(lines):
            if line.startswith("## Updated:"):
                lines[i] = f"## Updated: {timestamp} (pre-compact)"
                break
        active.write_text("\n".join(lines), encoding="utf-8")
        print(f"[PreCompact] Updated {active} timestamp to {timestamp}")
    else:
        print("[PreCompact] No project activeContext.md found.")

    # Log compaction event
    log_dir = os.path.expanduser("~/.claude/logs")
    os.makedirs(log_dir, exist_ok=True)
    log_path = os.path.join(log_dir, "sessions.log")
    with open(log_path, "a", encoding="utf-8") as f:
        f.write(f"{datetime.now().isoformat()} | COMPACT | cwd={os.getcwd()}\n")
main function · python · L16-L41 (26 LOC)
hooks/read_before_edit.py
def main():
    data = parse_stdin()
    if not data:
        sys.exit(0)

    tool_name = data.get("tool_name", "")
    tool_input = data.get("tool_input", {})

    if tool_name not in ("Edit", "Write"):
        sys.exit(0)

    file_path = tool_input.get("file_path", "")
    if not file_path:
        sys.exit(0)

    # Write to new files is always OK (no existing content to read)
    # For Edit, always remind
    if tool_name == "Edit":
        print(
            f"[read-before-edit] Editing {file_path}. "
            f"Confirm: did you Read this file first? "
            f"If not, Read it before editing — [MEMORY] does not replace [VERIFIED].",
            file=sys.stderr,
        )

    sys.exit(0)
get_last_commit_time function · python · L17-L27 (11 LOC)
hooks/session_save.py
def get_last_commit_time() -> float | None:
    """Get timestamp of the last git commit."""
    try:
        result = subprocess.run(
            ["git", "log", "-1", "--format=%ct"], capture_output=True, text=True, timeout=5
        )
        if result.returncode == 0 and result.stdout.strip():
            return float(result.stdout.strip())
    except Exception:
        pass
    return None
main function · python · L30-L80 (51 LOC)
hooks/session_save.py
def main():
    try:
        # 1. Update global activeContext timestamp
        global_path = os.path.expanduser("~/.claude/memory/activeContext.md")
        if os.path.exists(global_path):
            with open(global_path, encoding="utf-8") as f:
                content = f.read()
            lines = content.split("\n")
            for i, line in enumerate(lines):
                if "## Последнее обновление" in line and i + 1 < len(lines):
                    lines[i + 1] = datetime.now().strftime("%Y-%m-%d %H:%M")
                    break
            with open(global_path, "w", encoding="utf-8") as f:
                f.write("\n".join(lines))

        # 2. Log session
        log_dir = os.path.expanduser("~/.claude/logs")
        os.makedirs(log_dir, exist_ok=True)
        log_path = os.path.join(log_dir, "sessions.log")
        with open(log_path, "a", encoding="utf-8") as f:
            f.write(f"{datetime.now().isoformat()} | SESSION_END\n")

        # 3. Check project memory st
auto_update_config_repo function · python · L20-L51 (32 LOC)
hooks/session_start.py
def auto_update_config_repo():
    """If config was installed with --link, git pull the source repo.

    Detection: ~/.claude/.claude-code-config-repo contains repo path.
    """
    marker = Path.home() / ".claude" / CONFIG_REPO_MARKER
    if not marker.exists():
        return

    repo_path = marker.read_text(encoding="utf-8").strip()
    if not repo_path or not Path(repo_path).is_dir():
        return

    try:
        result = subprocess.run(
            ["git", "-C", repo_path, "pull", "--ff-only"],
            capture_output=True,
            text=True,
            timeout=10,
        )
        if result.returncode == 0:
            output = result.stdout.strip()
            if output and "Already up to date" not in output:
                print(f"[SessionStart] Config updated: {output}")
        else:
            # Log to stderr (not visible to Claude, but useful for debugging)
            print(
                f"[SessionStart] Config auto-update skipped: {result.stderr.strip
print_scope_fence function · python · L54-L81 (28 LOC)
hooks/session_start.py
def print_scope_fence():
    """Print Scope Fence status at session start."""
    fence_source = find_scope_fence()

    if fence_source is None:
        print(
            "\n[SessionStart] No Scope Fence found. "
            "Create .scope-fence.md in project root or add ## Scope Fence to activeContext.md."
        )
        return

    # ПОЧЕМУ: parse_scope_fence из utils инкапсулирует парсинг — не дублируем логику здесь.
    fence = parse_scope_fence(fence_source.read_text(encoding="utf-8"))
    fence_goal = fence.get("goal", "")
    fence_not_now = fence.get("not_now", "")

    if fence_goal and not fence_goal.startswith("{{"):
        source_label = fence_source.name
        print(f"\n[SessionStart] Scope Fence active (from {source_label}).")
        print(f"  Goal: {fence_goal}")
        if fence_not_now and not fence_not_now.startswith("{{"):
            print(f"  NOT NOW: {fence_not_now}")
    else:
        print(
            "\n[SessionStart] No Scope Fence found. "
         
main function · python · L84-L120 (37 LOC)
hooks/session_start.py
def main():
    # Auto-update config repo if installed with --link
    auto_update_config_repo()

    mem_dir = find_project_claude_dir()

    # Output project memory if available
    if mem_dir is not None:
        active = mem_dir / "activeContext.md"
        if active.exists():
            print(f"=== PROJECT ACTIVE CONTEXT ({active}) ===")
            print(active.read_text(encoding="utf-8"))
            print("=== END ACTIVE CONTEXT ===\n")

        decisions = mem_dir / "decisions.md"
        if decisions.exists():
            content = decisions.read_text(encoding="utf-8")
            if len(content) > 2000:
                content = "...(truncated)...\n" + content[-2000:]
            print(f"=== PROJECT DECISIONS ({decisions}) ===")
            print(content)
            print("=== END DECISIONS ===\n")

        print(f"[SessionStart] Loaded project memory from {mem_dir.parent}")
    else:
        print("[SessionStart] No project .claude/memory/ found in path hierarchy.")

    
parse_stdin function · python · L14-L25 (12 LOC)
hooks/utils.py
def parse_stdin() -> dict:
    """Parse JSON from stdin (Claude Code hook protocol).

    Returns empty dict on parse failure — hooks should exit gracefully.
    WHY: Every hook does this identically. Centralizing prevents
    inconsistent error handling (some used EOFError, some didn't).
    """
    try:
        result = json.load(sys.stdin)
        return result if isinstance(result, dict) else {}
    except (json.JSONDecodeError, EOFError, ValueError):
        return {}
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
parse_stdin_raw function · python · L28-L39 (12 LOC)
hooks/utils.py
def parse_stdin_raw() -> dict:
    """Parse JSON from stdin using read() instead of load().

    WHY: mcp_circuit_breaker uses sys.stdin.read() explicitly.
    Some hooks need this variant for compatibility.
    """
    try:
        raw = sys.stdin.read()
        result = json.loads(raw)
        return result if isinstance(result, dict) else {}
    except (json.JSONDecodeError, ValueError):
        return {}
get_tool_input function · python · L42-L49 (8 LOC)
hooks/utils.py
def get_tool_input(data: dict) -> dict:
    """Extract tool_input from hook data, supporting both nested and flat formats.

    WHY: Claude Code sends tool_input as a nested dict, but some older
    hook protocols use flat format. This handles both consistently.
    """
    tool_input = data.get("tool_input", data)
    return tool_input if isinstance(tool_input, dict) else data
run_git function · python · L52-L67 (16 LOC)
hooks/utils.py
def run_git(args: list[str], timeout: int = 10) -> str:
    """Run git command and return stdout.

    WHY: Duplicated identically in pre_commit_guard, post_commit_memory,
    pattern_extractor (3 copies, 36 lines total).
    """
    try:
        result = subprocess.run(
            ["git", *args],
            capture_output=True,
            text=True,
            timeout=timeout,
        )
        return result.stdout.strip()
    except (subprocess.TimeoutExpired, FileNotFoundError):
        return ""
find_project_memory function · python · L70-L81 (12 LOC)
hooks/utils.py
def find_project_memory() -> Path | None:
    """Find activeContext.md walking up from CWD.

    WHY: Duplicated in memory_guard, checkpoint_guard, post_commit_memory,
    session_save, pre_compact (5 copies with slight variations).
    """
    cwd = Path.cwd()
    for parent in [cwd, *cwd.parents]:
        candidate = parent / ".claude" / "memory" / "activeContext.md"
        if candidate.exists():
            return candidate
    return None
find_project_claude_dir function · python · L84-L99 (16 LOC)
hooks/utils.py
def find_project_claude_dir() -> Path | None:
    """Find .claude/memory/ directory walking up from CWD.

    WHY: session_start.py variant — returns directory, not file.
    Also checks for CLAUDE.md as project root marker.
    """
    cwd = Path.cwd()
    for parent in [cwd, *cwd.parents]:
        candidate = parent / ".claude" / "memory" / "activeContext.md"
        if candidate.exists():
            return parent / ".claude" / "memory"
        if (parent / "CLAUDE.md").exists():
            claude_mem = parent / ".claude" / "memory"
            if claude_mem.exists():
                return claude_mem
    return None
find_scope_fence function · python · L102-L123 (22 LOC)
hooks/utils.py
def find_scope_fence() -> Path | None:
    """Find Scope Fence file, searching multiple tool-agnostic locations.

    Search order (first found wins):
    1. .scope-fence.md at project root (universal)
    2. .claude/memory/activeContext.md (Claude Code)
    3. .cursor/memory_bank/activeContext.md (Cursor)

    WHY: Duplicated in drift_guard.py and session_start.py (identical logic).
    """
    cwd = Path.cwd()
    candidates = [
        ".scope-fence.md",
        str(Path(".claude") / "memory" / "activeContext.md"),
        str(Path(".cursor") / "memory_bank" / "activeContext.md"),
    ]
    for parent in [cwd, *cwd.parents]:
        for rel in candidates:
            full = parent / rel
            if full.exists():
                return full
    return None
parse_scope_fence function · python · L126-L154 (29 LOC)
hooks/utils.py
def parse_scope_fence(content: str) -> dict[str, str]:
    """Extract Scope Fence fields from file content.

    Returns dict with keys: goal, boundary, done_when, not_now.
    WHY: Used by both drift_guard and session_start.
    """
    fence: dict[str, str] = {}
    in_fence = False

    for line in content.splitlines():
        stripped = line.strip()
        if stripped == "## Scope Fence":
            in_fence = True
            continue
        if in_fence and stripped.startswith("## "):
            break
        if not in_fence:
            continue

        if stripped.startswith("Goal:"):
            fence["goal"] = stripped[5:].strip()
        elif stripped.startswith("Boundary:"):
            fence["boundary"] = stripped[9:].strip()
        elif stripped.startswith("Done when:"):
            fence["done_when"] = stripped[10:].strip()
        elif stripped.startswith("NOT NOW:"):
            fence["not_now"] = stripped[8:].strip()

    return fence
find_file_upward function · python · L157-L168 (12 LOC)
hooks/utils.py
def find_file_upward(relative_path: str) -> Path | None:
    """Find a file by walking up the directory tree from CWD.

    WHY: Generic version of find_project_memory/find_checkpoints_dir.
    Reduces the need for one-off search functions.
    """
    cwd = Path.cwd()
    for parent in [cwd, *cwd.parents]:
        full = parent / relative_path
        if full.exists():
            return full
    return None
About: code-quality intelligence by Repobility · https://repobility.com
get_mcp_server_name function · python · L171-L179 (9 LOC)
hooks/utils.py
def get_mcp_server_name(tool_name: str) -> str | None:
    """Extract MCP server name from tool name (mcp__<server>__<method>).

    WHY: Duplicated in mcp_circuit_breaker.py and mcp_circuit_breaker_post.py.
    """
    parts = tool_name.split("__")
    if len(parts) >= 3 and parts[0] == "mcp":
        return parts[1]
    return None
load_json_state function · python · L182-L194 (13 LOC)
hooks/utils.py
def load_json_state(path: Path) -> dict:
    """Load JSON state file, returning empty dict on any error.

    WHY: Duplicated in mcp_circuit_breaker and mcp_circuit_breaker_post
    (identical load_state functions).
    """
    if not path.exists():
        return {}
    try:
        result = json.loads(path.read_text(encoding="utf-8"))
        return result if isinstance(result, dict) else {}
    except (json.JSONDecodeError, OSError):
        return {}
save_json_state function · python · L197-L203 (7 LOC)
hooks/utils.py
def save_json_state(path: Path, state: dict) -> None:
    """Save dict as JSON state file, creating parent dirs.

    WHY: Duplicated in mcp_circuit_breaker and mcp_circuit_breaker_post.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(state, indent=2), encoding="utf-8")
page 1 / 3next ›