Function bodies 119 total
_write_plugin function · python · L51-L151 (101 LOC)repo_tools/agent/claude/__init__.py
def _write_plugin(
plugin_dir: Path,
rules_path: Path,
project_root: Path,
role: str | None = None,
tool_config: dict | None = None,
) -> None:
"""Write a Claude Code plugin directory with hooks and MCP config.
``plugin_dir`` is the directory that will be passed to ``--plugin-dir``.
The layout created is::
plugin_dir/
├── .claude-plugin/
│ └── plugin.json
├── hooks/
│ └── hooks.json
└── .mcp.json
"""
# -- manifest --
manifest_dir = plugin_dir / ".claude-plugin"
manifest_dir.mkdir(parents=True, exist_ok=True)
(manifest_dir / "plugin.json").write_text(
json.dumps(_PLUGIN_MANIFEST, indent=2), encoding="utf-8",
)
# -- hooks --
debug_log = project_root / "_agent" / "hooks.log"
base_cmd = [posix_path(sys.executable), "-m", "repo_tools.agent.hooks"]
check_bash_args = [
*base_cmd, "check_bash",
"--rules", rules_path.as_posix(),
"--projeClaude.build_command method · python · L157-L208 (52 LOC)repo_tools/agent/claude/__init__.py
def build_command(
self,
*,
prompt: str | None = None,
role: str | None = None,
role_prompt: str | None = None,
rules_path: Path | None = None,
project_root: Path | None = None,
tool_config: dict | None = None,
) -> list[str]:
config = tool_config or {}
# Build allowed tools list — roles get Bash
allowed = list(_ALLOWED_TOOLS)
if role and "Bash" not in allowed:
allowed.append("Bash")
cmd = ["claude", "--allowedTools", *allowed]
if config.get("debug_hooks"):
cmd.extend(["-d", "hooks"])
if role_prompt:
cmd.extend(["--append-system-prompt", role_prompt])
# Write plugin directory and add --plugin-dir to the command.
if (rules_path is not None) != (project_root is not None):
raise ValueError(
"rules_path and project_root must both be provided together; "
f"got rulescheck_installed function · python · L38-L51 (14 LOC)repo_tools/agent/coderabbit.py
def check_installed() -> bool:
"""Return True if coderabbit is available (natively or via WSL on Windows)."""
if is_windows():
try:
result = subprocess.run(
["wsl", "bash", "-lc", "command -v coderabbit"],
capture_output=True,
text=True,
timeout=10,
)
return result.returncode == 0
except (OSError, subprocess.SubprocessError):
return False
return shutil.which("coderabbit") is not Nonecoderabbit_cmd function · python · L54-L62 (9 LOC)repo_tools/agent/coderabbit.py
def coderabbit_cmd(*args: str) -> list[str]:
"""Return a command list to invoke coderabbit with *args*.
On Windows the CLI lives inside WSL and is typically installed as a
shell alias/function, so we must run it through a login shell.
"""
if is_windows():
return ["wsl", "bash", "-lc", shlex.join(["coderabbit", *args])]
return ["coderabbit", *args]call_review function · python · L68-L127 (60 LOC)repo_tools/agent/coderabbit.py
def call_review(args: dict[str, Any], *, logger: Any = None) -> dict[str, Any]:
"""Run ``coderabbit review --plain`` and return a result dict.
Parameters
----------
args:
Tool arguments (``worktree_path``, ``type``).
logger:
Optional logger for info-level messages (used by the HTTP MCP server).
"""
worktree_path = (args.get("worktree_path") or "").strip() or "."
review_type = args.get("type") or "committed"
if review_type not in VALID_REVIEW_TYPES:
return {
"isError": True,
"text": f"Invalid review type: {review_type!r} (expected one of {', '.join(sorted(VALID_REVIEW_TYPES))})",
}
if not Path(worktree_path).is_dir():
return {"isError": True, "text": f"worktree_path is not a directory: {worktree_path!r}"}
if not check_installed():
return {"isError": True, "text": NOT_INSTALLED}
try:
auth = subprocess.run(
coderabbit_cmd("auth", "status"),
main function · python · L13-L34 (22 LOC)repo_tools/agent/hooks/approve_mcp.py
def main() -> None:
parser = argparse.ArgumentParser(description="Auto-approve MCP tool permission requests.")
parser.add_argument("--debug-log", default=None, help="Append hook decisions to this file")
args = parser.parse_args()
log_path = Path(args.debug_log) if args.debug_log else None
event = json.load(sys.stdin)
tool_name = event.get("tool_name", "")
if log_path:
write_log(log_path, tool_name or "mcp_tool", "allow", "auto-approved MCP tool")
json.dump(
{
"hookSpecificOutput": {
"hookEventName": "PermissionRequest",
"decision": {"behavior": "allow"},
}
},
sys.stdout,
)main function · python · L28-L90 (63 LOC)repo_tools/agent/hooks/check_bash.py
def main() -> None:
parser = argparse.ArgumentParser(description="Check Bash commands against agent rules.")
parser.add_argument("--rules", required=True, help="Path to rules.toml")
parser.add_argument("--project-root", default=None, help="Project root for dir constraints")
parser.add_argument("--debug-log", default=None, help="Append hook decisions to this file")
parser.add_argument("--role", default=None, help="Agent role for role-specific rule filtering")
args = parser.parse_args()
log_path = Path(args.debug_log) if args.debug_log else None
rules_path = Path(args.rules)
if not rules_path.exists():
print(f"Rules file not found: {rules_path}", file=sys.stderr)
sys.exit(2)
# Read PreToolUse event from stdin and evaluate rules.
# Any exception here exits with code 2 so Claude sees a clear error message
# rather than an unhandled traceback (which would also exit non-zero but with
# a less informative exit code).
tSource: Repobility analyzer · https://repobility.com
_respond function · python · L65-L72 (8 LOC)repo_tools/agent/hooks/coderabbit_mcp_stdio.py
def _respond(req_id, result=None, error=None) -> str:
"""Return a JSON-RPC 2.0 response line."""
msg: dict = {"jsonrpc": "2.0", "id": req_id}
if error is not None:
msg["error"] = error
else:
msg["result"] = result
return json.dumps(msg)_dispatch function · python · L75-L118 (44 LOC)repo_tools/agent/hooks/coderabbit_mcp_stdio.py
def _dispatch(req: dict) -> str | None:
"""Process a single JSON-RPC request. Returns None for notifications."""
req_id = req.get("id")
method = req.get("method", "")
# Notifications have no id — produce no output
if req_id is None:
return None
if method == "initialize":
return _respond(req_id, {
"protocolVersion": _PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": _SERVER_NAME, "version": _SERVER_VERSION},
})
if method == "ping":
return _respond(req_id, {})
if method.startswith("notifications/"):
# Notifications with an id are unusual but we still produce no output
return None
if method == "tools/list":
return _respond(req_id, {"tools": _TOOLS})
if method == "tools/call":
params = req.get("params", {})
name = params.get("name", "")
tool_args = params.get("arguments", {})
if nmain function · python · L124-L148 (25 LOC)repo_tools/agent/hooks/coderabbit_mcp_stdio.py
def main() -> None:
"""Read newline-delimited JSON from stdin, write responses to stdout."""
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError:
# Invalid JSON — skip silently (no id to reply to)
continue
try:
response = _dispatch(req)
except Exception as exc:
print(f"coderabbit_mcp: dispatch error: {exc}", file=sys.stderr)
req_id = req.get("id") if isinstance(req, dict) else None
if req_id is not None:
response = _respond(req_id, error={"code": -32603, "message": "Internal error"})
else:
continue
if response is not None:
sys.stdout.write(response + "\n")
sys.stdout.flush()write_log function · python · L15-L26 (12 LOC)repo_tools/agent/hooks/__init__.py
def write_log(log_path: Path, command: str, decision: str, reason: str = "") -> None:
"""Append one line to the hook debug log."""
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%H:%M:%S")
line = f"[{ts}] {decision:5s} {command!r}"
if reason:
line += f" # {reason}"
with log_path.open("a", encoding="utf-8") as f:
f.write(line + "\n")
except OSError:
pass # Never let logging break the hookmain function · python · L29-L47 (19 LOC)repo_tools/agent/hooks/__init__.py
def main() -> None:
"""Dispatch to the correct hook subcommand."""
if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"):
print("Usage: python -m repo_tools.agent.hooks check_bash [args...]", file=sys.stderr)
sys.exit(2)
subcommand = sys.argv[1]
# Remove the subcommand from argv so the sub-module's argparse sees the right args
sys.argv = [sys.argv[0]] + sys.argv[2:]
if subcommand == "check_bash":
from .check_bash import main as sub_main
elif subcommand == "approve_mcp":
from .approve_mcp import main as sub_main
else:
print(f"Unknown subcommand: {subcommand!r}", file=sys.stderr)
sys.exit(2)
sub_main()_respond function · python · L64-L71 (8 LOC)repo_tools/agent/hooks/lint_mcp_stdio.py
def _respond(req_id, result=None, error=None) -> str:
"""Return a JSON-RPC 2.0 response line."""
msg: dict = {"jsonrpc": "2.0", "id": req_id}
if error is not None:
msg["error"] = error
else:
msg["result"] = result
return json.dumps(msg)_dispatch function · python · L74-L123 (50 LOC)repo_tools/agent/hooks/lint_mcp_stdio.py
def _dispatch(req: dict) -> str | None:
"""Process a single JSON-RPC request. Returns None for notifications."""
req_id = req.get("id")
method = req.get("method", "")
# Notifications have no id — produce no output
if req_id is None:
return None
if method == "initialize":
return _respond(req_id, {
"protocolVersion": _PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": _SERVER_NAME, "version": _SERVER_VERSION},
})
if method == "ping":
return _respond(req_id, {})
if method.startswith("notifications/"):
return None
if method == "tools/list":
return _respond(req_id, {"tools": _TOOLS})
if method == "tools/call":
params = req.get("params", {})
name = params.get("name", "")
tool_args = params.get("arguments", {})
if name == "lint":
outcome = call_lint(
tool_args,
main function · python · L129-L167 (39 LOC)repo_tools/agent/hooks/lint_mcp_stdio.py
def main() -> None:
"""Read newline-delimited JSON from stdin, write responses to stdout."""
global _default_select, _default_ignore
parser = argparse.ArgumentParser(description="Lint MCP stdio server")
parser.add_argument("--select", default=None, help="Default ruff --select codes")
parser.add_argument("--ignore", default=None, help="Default ruff --ignore codes")
parsed = parser.parse_args()
_default_select = parsed.select
_default_ignore = parsed.ignore
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError:
continue
try:
response = _dispatch(req)
except Exception as exc:
print(
f"lint_mcp: dispatch error: {exc}",
file=sys.stderr,
)
req_id = req.get("id") if isinstance(req, dict) else None
if req_id Same scanner, your repo: https://repobility.com — Repobility
_find_compile_commands function · python · L28-L42 (15 LOC)repo_tools/agent/lint.py
def _find_compile_commands(start: Path) -> str | None:
"""Search upward from *start* for a compile_commands.json file.
Returns the containing directory as a string, or None.
"""
current = start if start.is_dir() else start.parent
for parent in [current, *current.parents]:
if (parent / "compile_commands.json").exists():
return str(parent)
# Also check common build subdirectories
for build_dir in ("build", "out", "cmake-build-debug"):
candidate = parent / build_dir / "compile_commands.json"
if candidate.exists():
return str(parent / build_dir)
return None_call_ruff_check function · python · L48-L79 (32 LOC)repo_tools/agent/lint.py
def _call_ruff_check(
path: str,
*,
default_select: str | None = None,
default_ignore: str | None = None,
) -> dict[str, Any]:
"""Run ``ruff check`` and return a result dict."""
select = (default_select or "").strip() or _DEFAULT_SELECT
ignore = (default_ignore or "").strip() or None
exe = _find_executable("ruff")
if exe is None:
return {"isError": True, "text": "ruff is not installed."}
cmd = [exe, "check", "--output-format=concise"]
if select:
cmd.extend(["--select", select])
if ignore:
cmd.extend(["--ignore", ignore])
cmd.append(path)
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=120,
)
except subprocess.TimeoutExpired:
return {"isError": True, "text": "ruff check timed out."}
except (OSError, subprocess.SubprocessError) as exc:
return {"isError": True, "text": f"ruff check failed: {exc}"}
output = (proc.stdout or "") _call_clang_tidy function · python · L85-L124 (40 LOC)repo_tools/agent/lint.py
def _call_clang_tidy(path: str) -> dict[str, Any]:
"""Run ``clang-tidy`` on C/C++ files and return a result dict.
Automatically searches upward for compile_commands.json.
"""
exe = _find_executable("clang-tidy")
if exe is None:
return {"isError": True, "text": "clang-tidy is not installed."}
target = Path(path)
if target.is_file():
files = [str(target)]
elif target.is_dir():
files = [
str(f) for f in target.rglob("*")
if f.suffix in _CPP_EXTENSIONS
]
else:
return {"isError": True, "text": f"Path does not exist: {path!r}"}
if not files:
return {"text": "No C/C++ files found."}
cmd = [exe]
compile_dir = _find_compile_commands(target)
if compile_dir:
cmd.extend(["-p", compile_dir])
cmd.extend(files)
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=120,
)
except subprocess.TimeoutExpired:
_detect_languages function · python · L130-L144 (15 LOC)repo_tools/agent/lint.py
def _detect_languages(path: Path) -> tuple[bool, bool]:
"""Return (has_python, has_cpp) for a path."""
if path.is_file():
return path.suffix in _PY_EXTENSIONS, path.suffix in _CPP_EXTENSIONS
if path.is_dir():
has_py = has_cpp = False
for f in path.rglob("*"):
if f.suffix in _PY_EXTENSIONS:
has_py = True
elif f.suffix in _CPP_EXTENSIONS:
has_cpp = True
if has_py and has_cpp:
break
return has_py, has_cpp
return False, Falsecall_lint function · python · L147-L188 (42 LOC)repo_tools/agent/lint.py
def call_lint(
args: dict[str, Any],
*,
default_select: str | None = None,
default_ignore: str | None = None,
) -> dict[str, Any]:
"""Run the appropriate linter(s) based on file type."""
path_str = (args.get("path") or "").strip() or "."
target = Path(path_str)
if not target.exists():
return {
"isError": True,
"text": f"Path does not exist: {path_str!r}",
}
has_py, has_cpp = _detect_languages(target)
if not has_py and not has_cpp:
return {"text": "No lintable files found (Python or C/C++)."}
sections: list[str] = []
if has_py:
result = _call_ruff_check(
path_str,
default_select=default_select,
default_ignore=default_ignore,
)
if result.get("isError"):
sections.append(f"[ruff] {result['text']}")
else:
sections.append(result["text"])
if has_cpp:
result = _call_clang_tidy(path_str)
_compile_rule_list function · python · L40-L87 (48 LOC)repo_tools/agent/rules.py
def _compile_rule_list(entries: list, section: str, role: str | None = None) -> list[Rule]:
"""Validate and compile a list of raw rule dicts into :class:`Rule` objects.
Each rule must have at least one of ``commands`` or ``patterns``:
* ``commands`` — list of literal command names; each is compiled to
``^<re.escape(name)>\\b`` so that ``"rm"`` matches ``rm -rf`` but
not ``rmdir``.
* ``patterns`` — list of raw regex strings for complex matches.
Both may be present on the same rule (their compiled patterns are merged).
Raises :class:`ValueError` with the offending section index, rule name,
and pattern when a required key is missing or a regex is invalid.
When *role* is given, rules with a ``roles`` list that does not include
*role* are silently skipped.
"""
rules: list[Rule] = []
for i, r in enumerate(entries):
if "name" not in r:
raise ValueError(f"{section}[{i}]: missing required key 'name'")
nload_rules function · python · L90-L101 (12 LOC)repo_tools/agent/rules.py
def load_rules(path: Path, role: str | None = None) -> RuleSet:
"""Parse a TOML rules file into a :class:`RuleSet`.
When *role* is given, rules with a ``roles`` restriction that excludes
*role* are omitted from the returned :class:`RuleSet`.
"""
data = tomllib.loads(path.read_text(encoding="utf-8"))
return RuleSet(
default_reason=data.get("default_reason", "try another approach"),
deny=_compile_rule_list(data.get("deny", []), "deny", role=role),
allow=_compile_rule_list(data.get("allow", []), "allow", role=role),
)_extract_commands_regex function · python · L112-L125 (14 LOC)repo_tools/agent/rules.py
def _extract_commands_regex(command: str) -> list[str]:
"""Regex-based fallback for ``_extract_commands``."""
commands: list[str] = []
flat = " ".join(command.split())
for segment in _OPERATOR_RE.split(flat):
segment = segment.strip()
if not segment:
continue
words = segment.split()
while words and _ASSIGNMENT_RE.match(words[0]):
words.pop(0)
if words:
commands.append(" ".join(words))
return commandsAll rows scored by the Repobility analyzer (https://repobility.com)
_walk_bashlex function · python · L128-L150 (23 LOC)repo_tools/agent/rules.py
def _walk_bashlex(nodes: list, command: str) -> list[str]:
"""Recursively collect command strings from a bashlex AST.
Leading ``VAR=value`` assignments in each command node are stripped
using the AST word positions, so quoted values with spaces
(e.g. ``FOO="hello world" cmd``) are handled correctly.
"""
commands: list[str] = []
for node in nodes:
if node.kind == "command":
# Find the first word that isn't a VAR=… assignment
start = None
for part in node.parts:
text = command[part.pos[0]:part.pos[1]]
if start is None and _ASSIGNMENT_RE.match(text):
continue
if start is None:
start = part.pos[0]
if start is not None:
commands.append(command[start:node.pos[1]])
elif hasattr(node, "parts"):
commands.extend(_walk_bashlex(node.parts, command))
return commands_extract_commands function · python · L153-L168 (16 LOC)repo_tools/agent/rules.py
def _extract_commands(command: str) -> list[str]:
"""Split a compound shell command into individual segments.
Uses ``bashlex`` AST parsing to correctly handle quoted operators,
subshells, and process substitution. Falls back to regex splitting
on parse errors.
"""
if not command or command.isspace():
return []
try:
parts = bashlex.parse(command)
return _walk_bashlex(parts, command)
except bashlex.errors.ParsingError:
# Fall back to regex on any parse error
pass
return _extract_commands_regex(command)_check_dir_constraint function · python · L176-L189 (14 LOC)repo_tools/agent/rules.py
def _check_dir_constraint(dir_spec: str, project_root: Path | None, cwd: Path | None) -> bool:
negate = dir_spec.startswith("!")
name = dir_spec.lstrip("!")
if name == "project_root":
if project_root is None or cwd is None:
return True
try:
inside = cwd.is_relative_to(project_root)
except (ValueError, TypeError):
inside = False
return not inside if negate else inside
raise ValueError(f"Unknown dir_spec: {name!r}")check_command function · python · L192-L228 (37 LOC)repo_tools/agent/rules.py
def check_command(
command: str | None,
rules: RuleSet,
project_root: Path | None = None,
cwd: Path | None = None,
) -> tuple[bool, str]:
"""Check a Bash command against a rule set.
Returns ``(allowed, reason)``. When allowed, reason is ``""``.
"""
if not command:
return False, rules.default_reason
commands = _extract_commands(command)
if not commands:
return False, rules.default_reason
for rule in rules.deny:
if any(
any(pat.search(cmd) for pat in rule.patterns)
for cmd in commands
):
if _check_dir_constraint(rule.dir, project_root, cwd) if rule.dir else True:
return False, rule.reason or rules.default_reason
for cmd in commands:
matched_rule = None
for rule in rules.allow:
if any(pat.search(cmd) for pat in rule.patterns):
matched_rule = rule
break
if matched_rule is None:
_validate_id function · python · L215-L221 (7 LOC)repo_tools/agent/ticket_mcp.py
def _validate_id(value: str, field: str) -> str | None:
"""Return error message if *value* is not a safe identifier, else None."""
if not value:
return f"{field} must not be empty"
if not _SAFE_ID_RE.match(value):
return f"{field} contains invalid characters: {value!r}"
return None_validate_ticket function · python · L224-L268 (45 LOC)repo_tools/agent/ticket_mcp.py
def _validate_ticket(data: dict) -> str | None:
"""Validate ticket JSON structure. Return error message or None."""
# ticket section
ticket = data.get("ticket")
if not isinstance(ticket, dict):
return "missing 'ticket' section"
for field in ("id", "title", "description"):
val = ticket.get(field)
if not isinstance(val, str) or not val:
return f"ticket.{field} must be a non-empty string"
status = ticket.get("status")
if status not in _VALID_STATUSES:
return f"ticket.status must be one of {sorted(_VALID_STATUSES)}, got {status!r}"
# criteria — optional list
criteria = data.get("criteria")
if criteria is not None:
if not isinstance(criteria, list):
return "criteria must be a list"
for i, item in enumerate(criteria):
if not isinstance(item, dict):
return f"criteria[{i}] must be an object"
if not isinstance(item.get("criterion"), str):
_validate_transition function · python · L271-L307 (37 LOC)repo_tools/agent/ticket_mcp.py
def _validate_transition(
current: str, target: str, data: dict, *, role: str | None = None,
) -> str | None:
"""Return error if current->target is not an allowed status transition.
Also enforces cross-field constraints:
- verify -> closed requires review.result == "pass" and all criteria met
- verify -> todo requires review.result == "fail"
"""
allowed = _ALLOWED_TRANSITIONS.get(current, set())
if target not in allowed:
return (
f"invalid transition: {current!r} -> {target!r} "
f"(allowed: {sorted(allowed)})"
)
if role is not None:
role_allowed = _ROLE_ALLOWED_TRANSITIONS.get(role, set())
if (current, target) not in role_allowed:
return f"role {role!r} cannot transition {current!r} -> {target!r}"
if target == "closed":
review_result = data.get("review", {}).get("result", "")
if review_result != "pass":
return "cannot close ticket: review.result mus_load_required_criteria function · python · L314-L332 (19 LOC)repo_tools/agent/ticket_mcp.py
def _load_required_criteria(root: Path) -> list[str]:
"""Read ``agent.required_criteria`` from config.yaml."""
config_path = root / "config.yaml"
if not config_path.exists():
return []
try:
import yaml
data = yaml.safe_load(config_path.read_text(encoding="utf-8"))
except Exception:
return []
if not isinstance(data, dict):
return []
agent = data.get("agent")
if not isinstance(agent, dict):
return []
criteria = agent.get("required_criteria")
if not isinstance(criteria, list):
return []
return [str(c) for c in criteria]Repobility analyzer · published findings · https://repobility.com
_tool_list_tickets function · python · L338-L360 (23 LOC)repo_tools/agent/ticket_mcp.py
def _tool_list_tickets(root: Path, args: dict, *, role: str | None = None) -> dict:
tdir = _tickets_dir(root)
if not tdir.is_dir():
tdir.mkdir(parents=True, exist_ok=True)
tickets = []
for f in sorted(tdir.glob("*.json")):
entry: dict = {"id": f.stem}
try:
data = json.loads(f.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
entry["status"] = "invalid"
entry["error"] = f"invalid JSON: {exc}"
tickets.append(entry)
continue
if err := _validate_ticket(data):
entry["status"] = "invalid"
entry["error"] = f"bad schema: {err}"
else:
entry["status"] = data.get("ticket", {}).get("status", "unknown")
tickets.append(entry)
return {"text": json.dumps(tickets)}_tool_get_ticket function · python · L363-L380 (18 LOC)repo_tools/agent/ticket_mcp.py
def _tool_get_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
tid = args.get("ticket_id", "").strip()
if err := _validate_id(tid, "ticket_id"):
return {"isError": True, "text": err}
ticket_path = _tickets_dir(root) / f"{tid}.json"
if not ticket_path.exists():
return {"isError": True, "text": f"Ticket '{tid}' not found"}
content = ticket_path.read_text(encoding="utf-8")
try:
data = json.loads(content)
except json.JSONDecodeError:
return {"isError": True, "text": f"Ticket '{tid}' has invalid JSON"}
if err := _validate_ticket(data):
return {"isError": True, "text": f"Ticket '{tid}' has invalid schema: {err}"}
return {"text": content}_tool_create_ticket function · python · L383-L424 (42 LOC)repo_tools/agent/ticket_mcp.py
def _tool_create_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
tid = args.get("id", "").strip()
if err := _validate_id(tid, "id"):
return {"isError": True, "text": err}
tdir = _tickets_dir(root)
tdir.mkdir(parents=True, exist_ok=True)
ticket_path = tdir / f"{tid}.json"
if ticket_path.exists():
return {"isError": True, "text": f"Ticket '{tid}' already exists"}
title = args.get("title", "")
description = args.get("description", "")
raw_criteria = list(args.get("criteria", []))
# Merge required criteria (appended, deduplicated)
seen = set(raw_criteria)
for rc in _load_required_criteria(root):
if rc not in seen:
raw_criteria.append(rc)
seen.add(rc)
criteria = [{"criterion": c, "met": False} for c in raw_criteria]
data = {
"ticket": {
"id": tid,
"title": title,
"description": description,
"status": "todo",_tool_update_ticket function · python · L427-L492 (66 LOC)repo_tools/agent/ticket_mcp.py
def _tool_update_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
tid = args.get("ticket_id", "").strip()
if err := _validate_id(tid, "ticket_id"):
return {"isError": True, "text": err}
ticket_path = _tickets_dir(root) / f"{tid}.json"
if not ticket_path.exists():
return {"isError": True, "text": f"Ticket '{tid}' not found"}
data = json.loads(ticket_path.read_text(encoding="utf-8"))
updatable = {"status", "notes", "result", "feedback", "description"}
updates = {k: v for k, v in args.items() if k in updatable and v is not None}
if not updates:
return {"text": "No fields to update"}
if role is not None:
allowed_fields = _ROLE_UPDATE_FIELDS.get(role, set())
forbidden = set(updates.keys()) - allowed_fields
if forbidden:
return {"isError": True, "text": f"role {role!r} cannot update fields: {sorted(forbidden)}"}
# Apply non-status fields first so transition checks see t_tool_reset_ticket function · python · L495-L512 (18 LOC)repo_tools/agent/ticket_mcp.py
def _tool_reset_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
tid = args.get("ticket_id", "").strip()
if err := _validate_id(tid, "ticket_id"):
return {"isError": True, "text": err}
ticket_path = _tickets_dir(root) / f"{tid}.json"
if not ticket_path.exists():
return {"isError": True, "text": f"Ticket '{tid}' not found"}
data = json.loads(ticket_path.read_text(encoding="utf-8"))
data["ticket"]["status"] = "todo"
data["progress"] = {"notes": ""}
data["review"] = {"result": "", "feedback": ""}
for criterion in data.get("criteria", []):
criterion["met"] = False
ticket_path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
return {"text": f"Ticket '{tid}' reset to todo"}_tool_mark_criteria function · python · L515-L549 (35 LOC)repo_tools/agent/ticket_mcp.py
def _tool_mark_criteria(root: Path, args: dict, *, role: str | None = None) -> dict:
tid = args.get("ticket_id", "").strip()
if err := _validate_id(tid, "ticket_id"):
return {"isError": True, "text": err}
indices = args.get("indices", [])
if not indices:
return {"isError": True, "text": "indices must not be empty"}
met = args.get("met", True)
ticket_path = _tickets_dir(root) / f"{tid}.json"
if not ticket_path.exists():
return {"isError": True, "text": f"Ticket '{tid}' not found"}
data = json.loads(ticket_path.read_text(encoding="utf-8"))
criteria = data.get("criteria")
if not criteria:
return {"isError": True, "text": f"Ticket '{tid}' has no criteria"}
# Validate all indices before mutating (atomic: all-or-nothing)
for idx in indices:
if not isinstance(idx, int) or idx < 0:
return {"isError": True, "text": f"Invalid index: {idx}"}
if idx >= len(criteria):
return _tool_delete_ticket function · python · L552-L562 (11 LOC)repo_tools/agent/ticket_mcp.py
def _tool_delete_ticket(root: Path, args: dict, *, role: str | None = None) -> dict:
tid = args.get("ticket_id", "").strip()
if err := _validate_id(tid, "ticket_id"):
return {"isError": True, "text": err}
ticket_path = _tickets_dir(root) / f"{tid}.json"
if not ticket_path.exists():
return {"isError": True, "text": f"Ticket '{tid}' not found"}
ticket_path.unlink()
return {"text": f"Ticket '{tid}' deleted"}_respond function · python · L579-L585 (7 LOC)repo_tools/agent/ticket_mcp.py
def _respond(req_id, result=None, error=None) -> str:
msg: dict = {"jsonrpc": "2.0", "id": req_id}
if error is not None:
msg["error"] = error
else:
msg["result"] = result
return json.dumps(msg)Source: Repobility analyzer · https://repobility.com
_dispatch function · python · L588-L630 (43 LOC)repo_tools/agent/ticket_mcp.py
def _dispatch(root: Path, req: dict, *, role: str | None = None) -> str | None:
req_id = req.get("id")
method = req.get("method", "")
if req_id is None:
return None
if method == "initialize":
return _respond(req_id, {
"protocolVersion": _PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": _SERVER_NAME, "version": _SERVER_VERSION},
})
if method == "ping":
return _respond(req_id, {})
if method.startswith("notifications/"):
return None
if method == "tools/list":
return _respond(req_id, {"tools": _TOOLS})
if method == "tools/call":
params = req.get("params", {})
name = params.get("name", "")
tool_args = params.get("arguments", {})
handler = _TOOL_DISPATCH.get(name)
if handler is None:
outcome = {"isError": True, "text": f"Unknown tool: {name!r}"}
elif role is not None anmain function · python · L636-L665 (30 LOC)repo_tools/agent/ticket_mcp.py
def main() -> None:
parser = argparse.ArgumentParser(description="Ticket MCP stdio server")
parser.add_argument("--project-root", required=True, help="Project root directory")
parser.add_argument("--role", default=None, choices=["orchestrator", "worker", "reviewer"],
help="Agent role for access control")
args = parser.parse_args()
root = Path(args.project_root)
for raw_line in sys.stdin:
line = raw_line.strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError:
continue
try:
response = _dispatch(root, req, role=args.role)
except Exception as exc:
print(f"ticket_mcp: dispatch error: {exc}", file=sys.stderr)
req_id = req.get("id") if isinstance(req, dict) else None
if req_id is not None:
response = _respond(req_id, error={"code": -32603, "message": "Internal error"}_find_rules_file function · python · L36-L43 (8 LOC)repo_tools/agent/tool.py
def _find_rules_file(workspace_root: Path, configured: str | None = None) -> Path:
"""Find rules file: configured path first, then framework default."""
if configured:
candidate = workspace_root / configured
if candidate.exists():
return candidate
logger.warning(f"Configured rules file not found: {configured}")
return Path(__file__).parent / "allowlist_default.toml"_render_role_prompt function · python · L46-L61 (16 LOC)repo_tools/agent/tool.py
def _render_role_prompt(role: str, **kwargs: str) -> str:
"""Load prompt template for a role and format placeholders.
If ``prompts/common.txt`` exists it is prepended to the role template
so that every role receives shared context.
"""
prompts_dir = Path(__file__).parent / "prompts"
template_file = prompts_dir / f"{role}.txt"
if not template_file.exists():
return ""
parts: list[str] = []
common_file = prompts_dir / "common.txt"
if common_file.exists():
parts.append(common_file.read_text(encoding="utf-8"))
parts.append(template_file.read_text(encoding="utf-8"))
return "\n".join(parts).format_map(kwargs)_has_reviewable_changes function · python · L64-L97 (34 LOC)repo_tools/agent/tool.py
def _has_reviewable_changes(workspace_root: Path) -> bool:
"""Check if there are any changes for a reviewer to review.
Returns True if there are uncommitted changes (staged or unstaged)
or if the current branch has commits diverging from the default branch.
"""
cwd = str(workspace_root)
# Check uncommitted changes (staged + unstaged)
diff = subprocess.run(
["git", "diff", "HEAD", "--quiet"],
cwd=cwd, capture_output=True,
)
if diff.returncode != 0:
return True
# Check untracked files
untracked = subprocess.run(
["git", "ls-files", "--others", "--exclude-standard"],
cwd=cwd, capture_output=True, text=True,
)
if untracked.stdout.strip():
return True
# Check branch diff from common default branches
for base in ("main", "master"):
log = subprocess.run(
["git", "log", f"{base}..HEAD", "--oneline", "-1"],
cwd=cwd, capture_output=True, text=True,
_validate_ticket_id function · python · L103-L110 (8 LOC)repo_tools/agent/tool.py
def _validate_ticket_id(value: str, field: str) -> None:
"""Raise ValueError if *value* is not safe for use as a path component."""
if not value:
raise ValueError(f"{field} must not be empty")
if ".." in value or "/" in value or "\\" in value:
raise ValueError(f"{field} contains path separators: {value!r}")
if not _SAFE_AGENT_ID_RE.match(value):
raise ValueError(f"{field} contains unsafe characters: {value!r}")_prepare_ticket_session function · python · L114-L171 (58 LOC)repo_tools/agent/tool.py
def _prepare_ticket_session(
tool_ctx: ToolContext,
role: str,
ticket: str,
agent_cwd: Path,
) -> tuple[str, str]:
"""Load ticket, validate lifecycle, build prompt.
Returns ``(prompt, role_prompt)``. Exits on validation failure.
"""
_validate_ticket_id(ticket, "ticket")
ticket_path = tool_ctx.workspace_root / "_agent" / "tickets" / f"{ticket}.json"
if not ticket_path.exists():
logger.error(f"Ticket file not found: {ticket_path}")
sys.exit(1)
ticket_content = ticket_path.read_text(encoding="utf-8")
try:
ticket_data = json.loads(ticket_content)
ticket_status = ticket_data.get("ticket", {}).get("status", "unknown")
except (json.JSONDecodeError, AttributeError):
logger.error(f"Ticket file is not valid JSON: {ticket_path}")
sys.exit(1)
valid_statuses = {s for s, _ in _ROLE_ALLOWED_TRANSITIONS.get(role, set())}
if valid_statuses and ticket_status not in valid_statuses:
lo_process_agent_output function · python · L174-L267 (94 LOC)repo_tools/agent/tool.py
def _process_agent_output(
workspace_root: Path,
ticket: str,
role: str,
proc: subprocess.CompletedProcess[str],
) -> str | None:
"""Parse structured JSON output from a headless agent and apply ticket updates.
Claude Code ``--output-format json`` wraps output in an envelope::
{"type": "result", "subtype": "success"|"error_max_turns",
"is_error": bool, "structured_output": {...}, ...}
"""
if proc.returncode != 0:
logger.error(f"Agent exited with code {proc.returncode}")
try:
envelope = json.loads(proc.stdout)
except (json.JSONDecodeError, TypeError):
logger.error("Agent produced invalid JSON output — ticket not updated")
print(proc.stdout)
return proc.stdout
if not isinstance(envelope, dict):
logger.error("Agent output is not a JSON object — ticket not updated")
print(proc.stdout)
return proc.stdout
if envelope.get("is_error"):
subtype = envelope.Same scanner, your repo: https://repobility.com — Repobility
_agent_run function · python · L270-L334 (65 LOC)repo_tools/agent/tool.py
def _agent_run(tool_ctx: ToolContext, args: dict[str, Any]) -> str | None:
"""Launch an agent session.
Follows the ``RepoTool.execute(ctx, args)`` signature so that the
full tool config (merged defaults < config.yaml < CLI) arrives in
*args* — no individual parameter threading required.
With ``role`` + ``ticket``: runs headless, returns result text.
Without: replaces this process with interactive Claude.
"""
role = args.get("role")
ticket = args.get("ticket")
agent_cwd = tool_ctx.workspace_root
# Worktree setup — always for ticket-based runs
if role and ticket:
agent_cwd = ensure_worktree(tool_ctx.workspace_root, ticket)
# Session setup — ticket or interactive
if role and ticket:
prompt, role_prompt = _prepare_ticket_session(
tool_ctx, role, ticket, agent_cwd,
)
else:
prompt = None
role = "orchestrator"
repo_cmd = tool_ctx.tokens.get("repo", "./repo")
role_reset_ticket function · python · L340-L345 (6 LOC)repo_tools/agent/tool.py
def _reset_ticket(workspace_root: Path, ticket_id: str) -> None:
"""Reset a ticket to 'todo' — delegates to ticket_mcp._tool_reset_ticket."""
result = _tool_reset_ticket(workspace_root, {"ticket_id": ticket_id})
if result.get("isError"):
raise click.ClickException(result["text"])
logger.info(result["text"])_make_agent_command function · python · L348-L406 (59 LOC)repo_tools/agent/tool.py
def _make_agent_command(tool: RepoTool) -> click.Group:
"""Build the ``agent`` Click group."""
@click.group(
name="agent",
help="Run coding agents with workflows tailored for this repository.",
invoke_without_command=True,
)
@click.option("--role", default=None, type=click.Choice(["worker", "reviewer"]),
help="Role for this agent")
@click.option("--ticket", default=None, help="Ticket ID (for worker/reviewer roles)")
@click.option("--debug-hooks", is_flag=True, default=False,
help="Log hook decisions to _agent/hooks.log")
@click.pass_context
def agent(ctx: click.Context,
role: str | None, ticket: str | None,
debug_hooks: bool) -> None:
"""Launch an agent session."""
if ctx.invoked_subcommand is not None:
return
if bool(role) != bool(ticket):
raise click.UsageError("--role and --ticket must be used together")
toolpage 1 / 3next ›