← back to dw218192__repokit

Function bodies 119 total

All specs Real LLM only Function bodies
_write_plugin function · python · L51-L151 (101 LOC)
repo_tools/agent/claude/__init__.py
def _write_plugin(
    plugin_dir: Path,
    rules_path: Path,
    project_root: Path,
    role: str | None = None,
    tool_config: dict | None = None,
) -> None:
    """Write a Claude Code plugin directory with hooks and MCP config.

    ``plugin_dir`` is the directory that will be passed to ``--plugin-dir``.
    The layout created is::

        plugin_dir/
        ├── .claude-plugin/
        │   └── plugin.json
        ├── hooks/
        │   └── hooks.json
        └── .mcp.json
    """
    # -- manifest --
    manifest_dir = plugin_dir / ".claude-plugin"
    manifest_dir.mkdir(parents=True, exist_ok=True)
    (manifest_dir / "plugin.json").write_text(
        json.dumps(_PLUGIN_MANIFEST, indent=2), encoding="utf-8",
    )

    # -- hooks --
    debug_log = project_root / "_agent" / "hooks.log"
    base_cmd = [posix_path(sys.executable), "-m", "repo_tools.agent.hooks"]

    check_bash_args = [
        *base_cmd, "check_bash",
        "--rules", rules_path.as_posix(),
        "--proje
Claude.build_command method · python · L157-L208 (52 LOC)
repo_tools/agent/claude/__init__.py
    def build_command(
        self,
        *,
        prompt: str | None = None,
        role: str | None = None,
        role_prompt: str | None = None,
        rules_path: Path | None = None,
        project_root: Path | None = None,
        tool_config: dict | None = None,
    ) -> list[str]:
        config = tool_config or {}

        # Build allowed tools list — roles get Bash
        allowed = list(_ALLOWED_TOOLS)
        if role and "Bash" not in allowed:
            allowed.append("Bash")

        cmd = ["claude", "--allowedTools", *allowed]

        if config.get("debug_hooks"):
            cmd.extend(["-d", "hooks"])

        if role_prompt:
            cmd.extend(["--append-system-prompt", role_prompt])

        # Write plugin directory and add --plugin-dir to the command.
        if (rules_path is not None) != (project_root is not None):
            raise ValueError(
                "rules_path and project_root must both be provided together; "
                f"got rules
check_installed function · python · L38-L51 (14 LOC)
repo_tools/agent/coderabbit.py
def check_installed() -> bool:
    """Return True if coderabbit is available (natively or via WSL on Windows)."""
    if is_windows():
        try:
            result = subprocess.run(
                ["wsl", "bash", "-lc", "command -v coderabbit"],
                capture_output=True,
                text=True,
                timeout=10,
            )
            return result.returncode == 0
        except (OSError, subprocess.SubprocessError):
            return False
    return shutil.which("coderabbit") is not None
coderabbit_cmd function · python · L54-L62 (9 LOC)
repo_tools/agent/coderabbit.py
def coderabbit_cmd(*args: str) -> list[str]:
    """Return a command list to invoke coderabbit with *args*.

    On Windows the CLI lives inside WSL and is typically installed as a
    shell alias/function, so we must run it through a login shell.
    """
    if is_windows():
        return ["wsl", "bash", "-lc", shlex.join(["coderabbit", *args])]
    return ["coderabbit", *args]
call_review function · python · L68-L127 (60 LOC)
repo_tools/agent/coderabbit.py
def call_review(args: dict[str, Any], *, logger: Any = None) -> dict[str, Any]:
    """Run ``coderabbit review --plain`` and return a result dict.

    Parameters
    ----------
    args:
        Tool arguments (``worktree_path``, ``type``).
    logger:
        Optional logger for info-level messages (used by the HTTP MCP server).
    """
    worktree_path = (args.get("worktree_path") or "").strip() or "."
    review_type = args.get("type") or "committed"

    if review_type not in VALID_REVIEW_TYPES:
        return {
            "isError": True,
            "text": f"Invalid review type: {review_type!r} (expected one of {', '.join(sorted(VALID_REVIEW_TYPES))})",
        }

    if not Path(worktree_path).is_dir():
        return {"isError": True, "text": f"worktree_path is not a directory: {worktree_path!r}"}

    if not check_installed():
        return {"isError": True, "text": NOT_INSTALLED}

    try:
        auth = subprocess.run(
            coderabbit_cmd("auth", "status"),
     
main function · python · L13-L34 (22 LOC)
repo_tools/agent/hooks/approve_mcp.py
def main() -> None:
    parser = argparse.ArgumentParser(description="Auto-approve MCP tool permission requests.")
    parser.add_argument("--debug-log", default=None, help="Append hook decisions to this file")
    args = parser.parse_args()

    log_path = Path(args.debug_log) if args.debug_log else None

    event = json.load(sys.stdin)
    tool_name = event.get("tool_name", "")

    if log_path:
        write_log(log_path, tool_name or "mcp_tool", "allow", "auto-approved MCP tool")

    json.dump(
        {
            "hookSpecificOutput": {
                "hookEventName": "PermissionRequest",
                "decision": {"behavior": "allow"},
            }
        },
        sys.stdout,
    )
main function · python · L28-L90 (63 LOC)
repo_tools/agent/hooks/check_bash.py
def main() -> None:
    parser = argparse.ArgumentParser(description="Check Bash commands against agent rules.")
    parser.add_argument("--rules", required=True, help="Path to rules.toml")
    parser.add_argument("--project-root", default=None, help="Project root for dir constraints")
    parser.add_argument("--debug-log", default=None, help="Append hook decisions to this file")
    parser.add_argument("--role", default=None, help="Agent role for role-specific rule filtering")
    args = parser.parse_args()

    log_path = Path(args.debug_log) if args.debug_log else None

    rules_path = Path(args.rules)
    if not rules_path.exists():
        print(f"Rules file not found: {rules_path}", file=sys.stderr)
        sys.exit(2)

    # Read PreToolUse event from stdin and evaluate rules.
    # Any exception here exits with code 2 so Claude sees a clear error message
    # rather than an unhandled traceback (which would also exit non-zero but with
    # a less informative exit code).
    t
Source: Repobility analyzer · https://repobility.com
_respond function · python · L65-L72 (8 LOC)
repo_tools/agent/hooks/coderabbit_mcp_stdio.py
def _respond(req_id, result=None, error=None) -> str:
    """Return a JSON-RPC 2.0 response line."""
    msg: dict = {"jsonrpc": "2.0", "id": req_id}
    if error is not None:
        msg["error"] = error
    else:
        msg["result"] = result
    return json.dumps(msg)
_dispatch function · python · L75-L118 (44 LOC)
repo_tools/agent/hooks/coderabbit_mcp_stdio.py
def _dispatch(req: dict) -> str | None:
    """Process a single JSON-RPC request. Returns None for notifications."""
    req_id = req.get("id")
    method = req.get("method", "")

    # Notifications have no id — produce no output
    if req_id is None:
        return None

    if method == "initialize":
        return _respond(req_id, {
            "protocolVersion": _PROTOCOL_VERSION,
            "capabilities": {"tools": {"listChanged": False}},
            "serverInfo": {"name": _SERVER_NAME, "version": _SERVER_VERSION},
        })

    if method == "ping":
        return _respond(req_id, {})

    if method.startswith("notifications/"):
        # Notifications with an id are unusual but we still produce no output
        return None

    if method == "tools/list":
        return _respond(req_id, {"tools": _TOOLS})

    if method == "tools/call":
        params = req.get("params", {})
        name = params.get("name", "")
        tool_args = params.get("arguments", {})

        if n
main function · python · L124-L148 (25 LOC)
repo_tools/agent/hooks/coderabbit_mcp_stdio.py
def main() -> None:
    """Read newline-delimited JSON from stdin, write responses to stdout."""
    for raw_line in sys.stdin:
        line = raw_line.strip()
        if not line:
            continue
        try:
            req = json.loads(line)
        except json.JSONDecodeError:
            # Invalid JSON — skip silently (no id to reply to)
            continue

        try:
            response = _dispatch(req)
        except Exception as exc:
            print(f"coderabbit_mcp: dispatch error: {exc}", file=sys.stderr)
            req_id = req.get("id") if isinstance(req, dict) else None
            if req_id is not None:
                response = _respond(req_id, error={"code": -32603, "message": "Internal error"})
            else:
                continue

        if response is not None:
            sys.stdout.write(response + "\n")
            sys.stdout.flush()
write_log function · python · L15-L26 (12 LOC)
repo_tools/agent/hooks/__init__.py
def write_log(log_path: Path, command: str, decision: str, reason: str = "") -> None:
    """Append one line to the hook debug log."""
    try:
        log_path.parent.mkdir(parents=True, exist_ok=True)
        ts = datetime.now().strftime("%H:%M:%S")
        line = f"[{ts}] {decision:5s}  {command!r}"
        if reason:
            line += f"  # {reason}"
        with log_path.open("a", encoding="utf-8") as f:
            f.write(line + "\n")
    except OSError:
        pass  # Never let logging break the hook
main function · python · L29-L47 (19 LOC)
repo_tools/agent/hooks/__init__.py
def main() -> None:
    """Dispatch to the correct hook subcommand."""
    if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"):
        print("Usage: python -m repo_tools.agent.hooks check_bash [args...]", file=sys.stderr)
        sys.exit(2)

    subcommand = sys.argv[1]
    # Remove the subcommand from argv so the sub-module's argparse sees the right args
    sys.argv = [sys.argv[0]] + sys.argv[2:]

    if subcommand == "check_bash":
        from .check_bash import main as sub_main
    elif subcommand == "approve_mcp":
        from .approve_mcp import main as sub_main
    else:
        print(f"Unknown subcommand: {subcommand!r}", file=sys.stderr)
        sys.exit(2)

    sub_main()
_respond function · python · L64-L71 (8 LOC)
repo_tools/agent/hooks/lint_mcp_stdio.py
def _respond(req_id, result=None, error=None) -> str:
    """Return a JSON-RPC 2.0 response line."""
    msg: dict = {"jsonrpc": "2.0", "id": req_id}
    if error is not None:
        msg["error"] = error
    else:
        msg["result"] = result
    return json.dumps(msg)
_dispatch function · python · L74-L123 (50 LOC)
repo_tools/agent/hooks/lint_mcp_stdio.py
def _dispatch(req: dict) -> str | None:
    """Process a single JSON-RPC request. Returns None for notifications."""
    req_id = req.get("id")
    method = req.get("method", "")

    # Notifications have no id — produce no output
    if req_id is None:
        return None

    if method == "initialize":
        return _respond(req_id, {
            "protocolVersion": _PROTOCOL_VERSION,
            "capabilities": {"tools": {"listChanged": False}},
            "serverInfo": {"name": _SERVER_NAME, "version": _SERVER_VERSION},
        })

    if method == "ping":
        return _respond(req_id, {})

    if method.startswith("notifications/"):
        return None

    if method == "tools/list":
        return _respond(req_id, {"tools": _TOOLS})

    if method == "tools/call":
        params = req.get("params", {})
        name = params.get("name", "")
        tool_args = params.get("arguments", {})

        if name == "lint":
            outcome = call_lint(
                tool_args,
   
main function · python · L129-L167 (39 LOC)
repo_tools/agent/hooks/lint_mcp_stdio.py
def main() -> None:
    """Read newline-delimited JSON from stdin, write responses to stdout."""
    global _default_select, _default_ignore

    parser = argparse.ArgumentParser(description="Lint MCP stdio server")
    parser.add_argument("--select", default=None, help="Default ruff --select codes")
    parser.add_argument("--ignore", default=None, help="Default ruff --ignore codes")
    parsed = parser.parse_args()
    _default_select = parsed.select
    _default_ignore = parsed.ignore

    for raw_line in sys.stdin:
        line = raw_line.strip()
        if not line:
            continue
        try:
            req = json.loads(line)
        except json.JSONDecodeError:
            continue

        try:
            response = _dispatch(req)
        except Exception as exc:
            print(
                f"lint_mcp: dispatch error: {exc}",
                file=sys.stderr,
            )
            req_id = req.get("id") if isinstance(req, dict) else None
            if req_id 
Same scanner, your repo: https://repobility.com — Repobility
_find_compile_commands function · python · L28-L42 (15 LOC)
repo_tools/agent/lint.py
def _find_compile_commands(start: Path) -> str | None:
    """Search upward from *start* for a compile_commands.json file.

    Returns the containing directory as a string, or None.
    """
    current = start if start.is_dir() else start.parent
    for parent in [current, *current.parents]:
        if (parent / "compile_commands.json").exists():
            return str(parent)
        # Also check common build subdirectories
        for build_dir in ("build", "out", "cmake-build-debug"):
            candidate = parent / build_dir / "compile_commands.json"
            if candidate.exists():
                return str(parent / build_dir)
    return None
_call_ruff_check function · python · L48-L79 (32 LOC)
repo_tools/agent/lint.py
def _call_ruff_check(
    path: str,
    *,
    default_select: str | None = None,
    default_ignore: str | None = None,
) -> dict[str, Any]:
    """Run ``ruff check`` and return a result dict."""
    select = (default_select or "").strip() or _DEFAULT_SELECT
    ignore = (default_ignore or "").strip() or None

    exe = _find_executable("ruff")
    if exe is None:
        return {"isError": True, "text": "ruff is not installed."}

    cmd = [exe, "check", "--output-format=concise"]
    if select:
        cmd.extend(["--select", select])
    if ignore:
        cmd.extend(["--ignore", ignore])
    cmd.append(path)

    try:
        proc = subprocess.run(
            cmd, capture_output=True, text=True, timeout=120,
        )
    except subprocess.TimeoutExpired:
        return {"isError": True, "text": "ruff check timed out."}
    except (OSError, subprocess.SubprocessError) as exc:
        return {"isError": True, "text": f"ruff check failed: {exc}"}

    output = (proc.stdout or "") 
_call_clang_tidy function · python · L85-L124 (40 LOC)
repo_tools/agent/lint.py
def _call_clang_tidy(path: str) -> dict[str, Any]:
    """Run ``clang-tidy`` on C/C++ files and return a result dict.

    Automatically searches upward for compile_commands.json.
    """
    exe = _find_executable("clang-tidy")
    if exe is None:
        return {"isError": True, "text": "clang-tidy is not installed."}

    target = Path(path)
    if target.is_file():
        files = [str(target)]
    elif target.is_dir():
        files = [
            str(f) for f in target.rglob("*")
            if f.suffix in _CPP_EXTENSIONS
        ]
    else:
        return {"isError": True, "text": f"Path does not exist: {path!r}"}

    if not files:
        return {"text": "No C/C++ files found."}

    cmd = [exe]
    compile_dir = _find_compile_commands(target)
    if compile_dir:
        cmd.extend(["-p", compile_dir])
    cmd.extend(files)

    try:
        proc = subprocess.run(
            cmd, capture_output=True, text=True, timeout=120,
        )
    except subprocess.TimeoutExpired:
   
_detect_languages function · python · L130-L144 (15 LOC)
repo_tools/agent/lint.py
def _detect_languages(path: Path) -> tuple[bool, bool]:
    """Return (has_python, has_cpp) for a path."""
    if path.is_file():
        return path.suffix in _PY_EXTENSIONS, path.suffix in _CPP_EXTENSIONS
    if path.is_dir():
        has_py = has_cpp = False
        for f in path.rglob("*"):
            if f.suffix in _PY_EXTENSIONS:
                has_py = True
            elif f.suffix in _CPP_EXTENSIONS:
                has_cpp = True
            if has_py and has_cpp:
                break
        return has_py, has_cpp
    return False, False
call_lint function · python · L147-L188 (42 LOC)
repo_tools/agent/lint.py
def call_lint(
    args: dict[str, Any],
    *,
    default_select: str | None = None,
    default_ignore: str | None = None,
) -> dict[str, Any]:
    """Run the appropriate linter(s) based on file type."""
    path_str = (args.get("path") or "").strip() or "."
    target = Path(path_str)

    if not target.exists():
        return {
            "isError": True,
            "text": f"Path does not exist: {path_str!r}",
        }

    has_py, has_cpp = _detect_languages(target)

    if not has_py and not has_cpp:
        return {"text": "No lintable files found (Python or C/C++)."}

    sections: list[str] = []

    if has_py:
        result = _call_ruff_check(
            path_str,
            default_select=default_select,
            default_ignore=default_ignore,
        )
        if result.get("isError"):
            sections.append(f"[ruff] {result['text']}")
        else:
            sections.append(result["text"])

    if has_cpp:
        result = _call_clang_tidy(path_str)
    
_compile_rule_list function · python · L40-L87 (48 LOC)
repo_tools/agent/rules.py
def _compile_rule_list(entries: list, section: str, role: str | None = None) -> list[Rule]:
    """Validate and compile a list of raw rule dicts into :class:`Rule` objects.

    Each rule must have at least one of ``commands`` or ``patterns``:

    * ``commands`` — list of literal command names; each is compiled to
      ``^<re.escape(name)>\\b`` so that ``"rm"`` matches ``rm -rf`` but
      not ``rmdir``.
    * ``patterns`` — list of raw regex strings for complex matches.

    Both may be present on the same rule (their compiled patterns are merged).

    Raises :class:`ValueError` with the offending section index, rule name,
    and pattern when a required key is missing or a regex is invalid.

    When *role* is given, rules with a ``roles`` list that does not include
    *role* are silently skipped.
    """
    rules: list[Rule] = []
    for i, r in enumerate(entries):
        if "name" not in r:
            raise ValueError(f"{section}[{i}]: missing required key 'name'")
        n
load_rules function · python · L90-L101 (12 LOC)
repo_tools/agent/rules.py
def load_rules(path: Path, role: str | None = None) -> RuleSet:
    """Parse a TOML rules file into a :class:`RuleSet`.

    When *role* is given, rules with a ``roles`` restriction that excludes
    *role* are omitted from the returned :class:`RuleSet`.
    """
    data = tomllib.loads(path.read_text(encoding="utf-8"))
    return RuleSet(
        default_reason=data.get("default_reason", "try another approach"),
        deny=_compile_rule_list(data.get("deny", []), "deny", role=role),
        allow=_compile_rule_list(data.get("allow", []), "allow", role=role),
    )
_extract_commands_regex function · python · L112-L125 (14 LOC)
repo_tools/agent/rules.py
def _extract_commands_regex(command: str) -> list[str]:
    """Regex-based fallback for ``_extract_commands``."""
    commands: list[str] = []
    flat = " ".join(command.split())
    for segment in _OPERATOR_RE.split(flat):
        segment = segment.strip()
        if not segment:
            continue
        words = segment.split()
        while words and _ASSIGNMENT_RE.match(words[0]):
            words.pop(0)
        if words:
            commands.append(" ".join(words))
    return commands
All rows scored by the Repobility analyzer (https://repobility.com)
_walk_bashlex function · python · L128-L150 (23 LOC)
repo_tools/agent/rules.py
def _walk_bashlex(nodes: list, command: str) -> list[str]:
    """Recursively collect command strings from a bashlex AST.

    Leading ``VAR=value`` assignments in each command node are stripped
    using the AST word positions, so quoted values with spaces
    (e.g. ``FOO="hello world" cmd``) are handled correctly.
    """
    commands: list[str] = []
    for node in nodes:
        if node.kind == "command":
            # Find the first word that isn't a VAR=… assignment
            start = None
            for part in node.parts:
                text = command[part.pos[0]:part.pos[1]]
                if start is None and _ASSIGNMENT_RE.match(text):
                    continue
                if start is None:
                    start = part.pos[0]
            if start is not None:
                commands.append(command[start:node.pos[1]])
        elif hasattr(node, "parts"):
            commands.extend(_walk_bashlex(node.parts, command))
    return commands
_extract_commands function · python · L153-L168 (16 LOC)
repo_tools/agent/rules.py
def _extract_commands(command: str) -> list[str]:
    """Split a compound shell command into individual segments.

    Uses ``bashlex`` AST parsing to correctly handle quoted operators,
    subshells, and process substitution.  Falls back to regex splitting
    on parse errors.
    """
    if not command or command.isspace():
        return []
    try:
        parts = bashlex.parse(command)
        return _walk_bashlex(parts, command)
    except bashlex.errors.ParsingError:
        # Fall back to regex on any parse error
        pass
    return _extract_commands_regex(command)
_check_dir_constraint function · python · L176-L189 (14 LOC)
repo_tools/agent/rules.py
def _check_dir_constraint(dir_spec: str, project_root: Path | None, cwd: Path | None) -> bool:
    negate = dir_spec.startswith("!")
    name = dir_spec.lstrip("!")

    if name == "project_root":
        if project_root is None or cwd is None:
            return True
        try:
            inside = cwd.is_relative_to(project_root)
        except (ValueError, TypeError):
            inside = False
        return not inside if negate else inside

    raise ValueError(f"Unknown dir_spec: {name!r}")
check_command function · python · L192-L228 (37 LOC)
repo_tools/agent/rules.py
def check_command(
    command: str | None,
    rules: RuleSet,
    project_root: Path | None = None,
    cwd: Path | None = None,
) -> tuple[bool, str]:
    """Check a Bash command against a rule set.

    Returns ``(allowed, reason)``.  When allowed, reason is ``""``.
    """
    if not command:
        return False, rules.default_reason

    commands = _extract_commands(command)
    if not commands:
        return False, rules.default_reason

    for rule in rules.deny:
        if any(
            any(pat.search(cmd) for pat in rule.patterns)
            for cmd in commands
        ):
            if _check_dir_constraint(rule.dir, project_root, cwd) if rule.dir else True:
                return False, rule.reason or rules.default_reason

    for cmd in commands:
        matched_rule = None
        for rule in rules.allow:
            if any(pat.search(cmd) for pat in rule.patterns):
                matched_rule = rule
                break
        if matched_rule is None:
          
_validate_id function · python · L215-L221 (7 LOC)
repo_tools/agent/ticket_mcp.py
def _validate_id(value: str, field: str) -> str | None:
    """Return error message if *value* is not a safe identifier, else None."""
    if not value:
        return f"{field} must not be empty"
    if not _SAFE_ID_RE.match(value):
        return f"{field} contains invalid characters: {value!r}"
    return None
_validate_ticket function · python · L224-L268 (45 LOC)
repo_tools/agent/ticket_mcp.py
def _validate_ticket(data: dict) -> str | None:
    """Validate ticket JSON structure. Return error message or None."""
    # ticket section
    ticket = data.get("ticket")
    if not isinstance(ticket, dict):
        return "missing 'ticket' section"
    for field in ("id", "title", "description"):
        val = ticket.get(field)
        if not isinstance(val, str) or not val:
            return f"ticket.{field} must be a non-empty string"
    status = ticket.get("status")
    if status not in _VALID_STATUSES:
        return f"ticket.status must be one of {sorted(_VALID_STATUSES)}, got {status!r}"

    # criteria — optional list
    criteria = data.get("criteria")
    if criteria is not None:
        if not isinstance(criteria, list):
            return "criteria must be a list"
        for i, item in enumerate(criteria):
            if not isinstance(item, dict):
                return f"criteria[{i}] must be an object"
            if not isinstance(item.get("criterion"), str):
     
_validate_transition function · python · L271-L307 (37 LOC)
repo_tools/agent/ticket_mcp.py
def _validate_transition(
    current: str, target: str, data: dict, *, role: str | None = None,
) -> str | None:
    """Return error if current->target is not an allowed status transition.

    Also enforces cross-field constraints:
    - verify -> closed requires review.result == "pass" and all criteria met
    - verify -> todo requires review.result == "fail"
    """
    allowed = _ALLOWED_TRANSITIONS.get(current, set())
    if target not in allowed:
        return (
            f"invalid transition: {current!r} -> {target!r} "
            f"(allowed: {sorted(allowed)})"
        )

    if role is not None:
        role_allowed = _ROLE_ALLOWED_TRANSITIONS.get(role, set())
        if (current, target) not in role_allowed:
            return f"role {role!r} cannot transition {current!r} -> {target!r}"

    if target == "closed":
        review_result = data.get("review", {}).get("result", "")
        if review_result != "pass":
            return "cannot close ticket: review.result mus
_load_required_criteria function · python · L314-L332 (19 LOC)
repo_tools/agent/ticket_mcp.py
def _load_required_criteria(root: Path) -> list[str]:
    """Read ``agent.required_criteria`` from config.yaml."""
    config_path = root / "config.yaml"
    if not config_path.exists():
        return []
    try:
        import yaml
        data = yaml.safe_load(config_path.read_text(encoding="utf-8"))
    except Exception:
        return []
    if not isinstance(data, dict):
        return []
    agent = data.get("agent")
    if not isinstance(agent, dict):
        return []
    criteria = agent.get("required_criteria")
    if not isinstance(criteria, list):
        return []
    return [str(c) for c in criteria]
Repobility analyzer · published findings · https://repobility.com
_tool_list_tickets function · python · L338-L360 (23 LOC)
repo_tools/agent/ticket_mcp.py
def _tool_list_tickets(root: Path, args: dict, *, role: str | None = None) -> dict:
    tdir = _tickets_dir(root)
    if not tdir.is_dir():
        tdir.mkdir(parents=True, exist_ok=True)

    tickets = []
    for f in sorted(tdir.glob("*.json")):
        entry: dict = {"id": f.stem}
        try:
            data = json.loads(f.read_text(encoding="utf-8"))
        except json.JSONDecodeError as exc:
            entry["status"] = "invalid"
            entry["error"] = f"invalid JSON: {exc}"
            tickets.append(entry)
            continue
        if err := _validate_ticket(data):
            entry["status"] = "invalid"
            entry["error"] = f"bad schema: {err}"
        else:
            entry["status"] = data.get("ticket", {}).get("status", "unknown")
        tickets.append(entry)

    return {"text": json.dumps(tickets)}
_tool_get_ticket function · python · L363-L380 (18 LOC)
repo_tools/agent/ticket_mcp.py
def _tool_get_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
    tid = args.get("ticket_id", "").strip()
    if err := _validate_id(tid, "ticket_id"):
        return {"isError": True, "text": err}

    ticket_path = _tickets_dir(root) / f"{tid}.json"
    if not ticket_path.exists():
        return {"isError": True, "text": f"Ticket '{tid}' not found"}

    content = ticket_path.read_text(encoding="utf-8")
    try:
        data = json.loads(content)
    except json.JSONDecodeError:
        return {"isError": True, "text": f"Ticket '{tid}' has invalid JSON"}
    if err := _validate_ticket(data):
        return {"isError": True, "text": f"Ticket '{tid}' has invalid schema: {err}"}

    return {"text": content}
_tool_create_ticket function · python · L383-L424 (42 LOC)
repo_tools/agent/ticket_mcp.py
def _tool_create_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
    tid = args.get("id", "").strip()
    if err := _validate_id(tid, "id"):
        return {"isError": True, "text": err}

    tdir = _tickets_dir(root)
    tdir.mkdir(parents=True, exist_ok=True)

    ticket_path = tdir / f"{tid}.json"
    if ticket_path.exists():
        return {"isError": True, "text": f"Ticket '{tid}' already exists"}

    title = args.get("title", "")
    description = args.get("description", "")
    raw_criteria = list(args.get("criteria", []))

    # Merge required criteria (appended, deduplicated)
    seen = set(raw_criteria)
    for rc in _load_required_criteria(root):
        if rc not in seen:
            raw_criteria.append(rc)
            seen.add(rc)

    criteria = [{"criterion": c, "met": False} for c in raw_criteria]

    data = {
        "ticket": {
            "id": tid,
            "title": title,
            "description": description,
            "status": "todo",
_tool_update_ticket function · python · L427-L492 (66 LOC)
repo_tools/agent/ticket_mcp.py
def _tool_update_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
    tid = args.get("ticket_id", "").strip()
    if err := _validate_id(tid, "ticket_id"):
        return {"isError": True, "text": err}

    ticket_path = _tickets_dir(root) / f"{tid}.json"
    if not ticket_path.exists():
        return {"isError": True, "text": f"Ticket '{tid}' not found"}

    data = json.loads(ticket_path.read_text(encoding="utf-8"))

    updatable = {"status", "notes", "result", "feedback", "description"}
    updates = {k: v for k, v in args.items() if k in updatable and v is not None}
    if not updates:
        return {"text": "No fields to update"}

    if role is not None:
        allowed_fields = _ROLE_UPDATE_FIELDS.get(role, set())
        forbidden = set(updates.keys()) - allowed_fields
        if forbidden:
            return {"isError": True, "text": f"role {role!r} cannot update fields: {sorted(forbidden)}"}

    # Apply non-status fields first so transition checks see t
_tool_reset_ticket function · python · L495-L512 (18 LOC)
repo_tools/agent/ticket_mcp.py
def _tool_reset_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
    tid = args.get("ticket_id", "").strip()
    if err := _validate_id(tid, "ticket_id"):
        return {"isError": True, "text": err}

    ticket_path = _tickets_dir(root) / f"{tid}.json"
    if not ticket_path.exists():
        return {"isError": True, "text": f"Ticket '{tid}' not found"}

    data = json.loads(ticket_path.read_text(encoding="utf-8"))
    data["ticket"]["status"] = "todo"
    data["progress"] = {"notes": ""}
    data["review"] = {"result": "", "feedback": ""}
    for criterion in data.get("criteria", []):
        criterion["met"] = False

    ticket_path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
    return {"text": f"Ticket '{tid}' reset to todo"}
_tool_mark_criteria function · python · L515-L549 (35 LOC)
repo_tools/agent/ticket_mcp.py
def _tool_mark_criteria(root: Path, args: dict, *, role: str | None = None) -> dict:
    tid = args.get("ticket_id", "").strip()
    if err := _validate_id(tid, "ticket_id"):
        return {"isError": True, "text": err}

    indices = args.get("indices", [])
    if not indices:
        return {"isError": True, "text": "indices must not be empty"}

    met = args.get("met", True)

    ticket_path = _tickets_dir(root) / f"{tid}.json"
    if not ticket_path.exists():
        return {"isError": True, "text": f"Ticket '{tid}' not found"}

    data = json.loads(ticket_path.read_text(encoding="utf-8"))
    criteria = data.get("criteria")
    if not criteria:
        return {"isError": True, "text": f"Ticket '{tid}' has no criteria"}

    # Validate all indices before mutating (atomic: all-or-nothing)
    for idx in indices:
        if not isinstance(idx, int) or idx < 0:
            return {"isError": True, "text": f"Invalid index: {idx}"}
        if idx >= len(criteria):
            return 
_tool_delete_ticket function · python · L552-L562 (11 LOC)
repo_tools/agent/ticket_mcp.py
def _tool_delete_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
    tid = args.get("ticket_id", "").strip()
    if err := _validate_id(tid, "ticket_id"):
        return {"isError": True, "text": err}

    ticket_path = _tickets_dir(root) / f"{tid}.json"
    if not ticket_path.exists():
        return {"isError": True, "text": f"Ticket '{tid}' not found"}

    ticket_path.unlink()
    return {"text": f"Ticket '{tid}' deleted"}
_respond function · python · L579-L585 (7 LOC)
repo_tools/agent/ticket_mcp.py
def _respond(req_id, result=None, error=None) -> str:
    msg: dict = {"jsonrpc": "2.0", "id": req_id}
    if error is not None:
        msg["error"] = error
    else:
        msg["result"] = result
    return json.dumps(msg)
Source: Repobility analyzer · https://repobility.com
_dispatch function · python · L588-L630 (43 LOC)
repo_tools/agent/ticket_mcp.py
def _dispatch(root: Path, req: dict, *, role: str | None = None) -> str | None:
    req_id = req.get("id")
    method = req.get("method", "")

    if req_id is None:
        return None

    if method == "initialize":
        return _respond(req_id, {
            "protocolVersion": _PROTOCOL_VERSION,
            "capabilities": {"tools": {"listChanged": False}},
            "serverInfo": {"name": _SERVER_NAME, "version": _SERVER_VERSION},
        })

    if method == "ping":
        return _respond(req_id, {})

    if method.startswith("notifications/"):
        return None

    if method == "tools/list":
        return _respond(req_id, {"tools": _TOOLS})

    if method == "tools/call":
        params = req.get("params", {})
        name = params.get("name", "")
        tool_args = params.get("arguments", {})

        handler = _TOOL_DISPATCH.get(name)
        if handler is None:
            outcome = {"isError": True, "text": f"Unknown tool: {name!r}"}
        elif role is not None an
main function · python · L636-L665 (30 LOC)
repo_tools/agent/ticket_mcp.py
def main() -> None:
    parser = argparse.ArgumentParser(description="Ticket MCP stdio server")
    parser.add_argument("--project-root", required=True, help="Project root directory")
    parser.add_argument("--role", default=None, choices=["orchestrator", "worker", "reviewer"],
                        help="Agent role for access control")
    args = parser.parse_args()
    root = Path(args.project_root)

    for raw_line in sys.stdin:
        line = raw_line.strip()
        if not line:
            continue
        try:
            req = json.loads(line)
        except json.JSONDecodeError:
            continue

        try:
            response = _dispatch(root, req, role=args.role)
        except Exception as exc:
            print(f"ticket_mcp: dispatch error: {exc}", file=sys.stderr)
            req_id = req.get("id") if isinstance(req, dict) else None
            if req_id is not None:
                response = _respond(req_id, error={"code": -32603, "message": "Internal error"}
_find_rules_file function · python · L36-L43 (8 LOC)
repo_tools/agent/tool.py
def _find_rules_file(workspace_root: Path, configured: str | None = None) -> Path:
    """Find rules file: configured path first, then framework default."""
    if configured:
        candidate = workspace_root / configured
        if candidate.exists():
            return candidate
        logger.warning(f"Configured rules file not found: {configured}")
    return Path(__file__).parent / "allowlist_default.toml"
_render_role_prompt function · python · L46-L61 (16 LOC)
repo_tools/agent/tool.py
def _render_role_prompt(role: str, **kwargs: str) -> str:
    """Load prompt template for a role and format placeholders.

    If ``prompts/common.txt`` exists it is prepended to the role template
    so that every role receives shared context.
    """
    prompts_dir = Path(__file__).parent / "prompts"
    template_file = prompts_dir / f"{role}.txt"
    if not template_file.exists():
        return ""
    parts: list[str] = []
    common_file = prompts_dir / "common.txt"
    if common_file.exists():
        parts.append(common_file.read_text(encoding="utf-8"))
    parts.append(template_file.read_text(encoding="utf-8"))
    return "\n".join(parts).format_map(kwargs)
_has_reviewable_changes function · python · L64-L97 (34 LOC)
repo_tools/agent/tool.py
def _has_reviewable_changes(workspace_root: Path) -> bool:
    """Check if there are any changes for a reviewer to review.

    Returns True if there are uncommitted changes (staged or unstaged)
    or if the current branch has commits diverging from the default branch.
    """
    cwd = str(workspace_root)

    # Check uncommitted changes (staged + unstaged)
    diff = subprocess.run(
        ["git", "diff", "HEAD", "--quiet"],
        cwd=cwd, capture_output=True,
    )
    if diff.returncode != 0:
        return True

    # Check untracked files
    untracked = subprocess.run(
        ["git", "ls-files", "--others", "--exclude-standard"],
        cwd=cwd, capture_output=True, text=True,
    )
    if untracked.stdout.strip():
        return True

    # Check branch diff from common default branches
    for base in ("main", "master"):
        log = subprocess.run(
            ["git", "log", f"{base}..HEAD", "--oneline", "-1"],
            cwd=cwd, capture_output=True, text=True,
     
_validate_ticket_id function · python · L103-L110 (8 LOC)
repo_tools/agent/tool.py
def _validate_ticket_id(value: str, field: str) -> None:
    """Raise ValueError if *value* is not safe for use as a path component."""
    if not value:
        raise ValueError(f"{field} must not be empty")
    if ".." in value or "/" in value or "\\" in value:
        raise ValueError(f"{field} contains path separators: {value!r}")
    if not _SAFE_AGENT_ID_RE.match(value):
        raise ValueError(f"{field} contains unsafe characters: {value!r}")
_prepare_ticket_session function · python · L114-L171 (58 LOC)
repo_tools/agent/tool.py
def _prepare_ticket_session(
    tool_ctx: ToolContext,
    role: str,
    ticket: str,
    agent_cwd: Path,
) -> tuple[str, str]:
    """Load ticket, validate lifecycle, build prompt.

    Returns ``(prompt, role_prompt)``.  Exits on validation failure.
    """
    _validate_ticket_id(ticket, "ticket")
    ticket_path = tool_ctx.workspace_root / "_agent" / "tickets" / f"{ticket}.json"

    if not ticket_path.exists():
        logger.error(f"Ticket file not found: {ticket_path}")
        sys.exit(1)

    ticket_content = ticket_path.read_text(encoding="utf-8")

    try:
        ticket_data = json.loads(ticket_content)
        ticket_status = ticket_data.get("ticket", {}).get("status", "unknown")
    except (json.JSONDecodeError, AttributeError):
        logger.error(f"Ticket file is not valid JSON: {ticket_path}")
        sys.exit(1)

    valid_statuses = {s for s, _ in _ROLE_ALLOWED_TRANSITIONS.get(role, set())}
    if valid_statuses and ticket_status not in valid_statuses:
        lo
_process_agent_output function · python · L174-L267 (94 LOC)
repo_tools/agent/tool.py
def _process_agent_output(
    workspace_root: Path,
    ticket: str,
    role: str,
    proc: subprocess.CompletedProcess[str],
) -> str | None:
    """Parse structured JSON output from a headless agent and apply ticket updates.

    Claude Code ``--output-format json`` wraps output in an envelope::

        {"type": "result", "subtype": "success"|"error_max_turns",
         "is_error": bool, "structured_output": {...}, ...}
    """
    if proc.returncode != 0:
        logger.error(f"Agent exited with code {proc.returncode}")

    try:
        envelope = json.loads(proc.stdout)
    except (json.JSONDecodeError, TypeError):
        logger.error("Agent produced invalid JSON output — ticket not updated")
        print(proc.stdout)
        return proc.stdout

    if not isinstance(envelope, dict):
        logger.error("Agent output is not a JSON object — ticket not updated")
        print(proc.stdout)
        return proc.stdout

    if envelope.get("is_error"):
        subtype = envelope.
Same scanner, your repo: https://repobility.com — Repobility
_agent_run function · python · L270-L334 (65 LOC)
repo_tools/agent/tool.py
def _agent_run(tool_ctx: ToolContext, args: dict[str, Any]) -> str | None:
    """Launch an agent session.

    Follows the ``RepoTool.execute(ctx, args)`` signature so that the
    full tool config (merged defaults < config.yaml < CLI) arrives in
    *args* — no individual parameter threading required.

    With ``role`` + ``ticket``: runs headless, returns result text.
    Without: replaces this process with interactive Claude.
    """
    role = args.get("role")
    ticket = args.get("ticket")
    agent_cwd = tool_ctx.workspace_root

    # Worktree setup — always for ticket-based runs
    if role and ticket:
        agent_cwd = ensure_worktree(tool_ctx.workspace_root, ticket)

    # Session setup — ticket or interactive
    if role and ticket:
        prompt, role_prompt = _prepare_ticket_session(
            tool_ctx, role, ticket, agent_cwd,
        )
    else:
        prompt = None
        role = "orchestrator"
        repo_cmd = tool_ctx.tokens.get("repo", "./repo")
        role
_reset_ticket function · python · L340-L345 (6 LOC)
repo_tools/agent/tool.py
def _reset_ticket(workspace_root: Path, ticket_id: str) -> None:
    """Reset a ticket to 'todo' — delegates to ticket_mcp._tool_reset_ticket."""
    result = _tool_reset_ticket(workspace_root, {"ticket_id": ticket_id})
    if result.get("isError"):
        raise click.ClickException(result["text"])
    logger.info(result["text"])
_make_agent_command function · python · L348-L406 (59 LOC)
repo_tools/agent/tool.py
def _make_agent_command(tool: RepoTool) -> click.Group:
    """Build the ``agent`` Click group."""

    @click.group(
        name="agent",
        help="Run coding agents with workflows tailored for this repository.",
        invoke_without_command=True,
    )
    @click.option("--role", default=None, type=click.Choice(["worker", "reviewer"]),
                  help="Role for this agent")
    @click.option("--ticket", default=None, help="Ticket ID (for worker/reviewer roles)")
    @click.option("--debug-hooks", is_flag=True, default=False,
                  help="Log hook decisions to _agent/hooks.log")
    @click.pass_context
    def agent(ctx: click.Context,
              role: str | None, ticket: str | None,
              debug_hooks: bool) -> None:
        """Launch an agent session."""
        if ctx.invoked_subcommand is not None:
            return
        if bool(role) != bool(ticket):
            raise click.UsageError("--role and --ticket must be used together")
        tool
page 1 / 3next ›