Function bodies 119 total
ensure_worktree function · python · L15-L47 (33 LOC)repo_tools/agent/worktree.py
def ensure_worktree(workspace_root: Path, ticket: str) -> Path:
"""Create or reuse a git worktree for the given ticket under _agent/worktrees/."""
wt_dir = workspace_root / "_agent" / "worktrees" / ticket
branch_name = f"worktree-{ticket}"
if wt_dir.exists():
logger.info(f"Reusing existing worktree: {wt_dir}")
return wt_dir
wt_dir.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
["git", "worktree", "prune"],
cwd=str(workspace_root), capture_output=True,
)
branch_exists = subprocess.run(
["git", "rev-parse", "--verify", branch_name],
cwd=str(workspace_root), capture_output=True,
).returncode == 0
if branch_exists:
subprocess.run(
["git", "worktree", "add", str(wt_dir), branch_name],
cwd=str(workspace_root), check=True,
)
else:
subprocess.run(
["git", "worktree", "add", "-b", branch_name, str(wt_dir), "HEAD"],
cwdremove_worktree function · python · L50-L64 (15 LOC)repo_tools/agent/worktree.py
def remove_worktree(workspace_root: Path, ticket: str) -> None:
"""Remove the git worktree for a ticket."""
wt_dir = workspace_root / "_agent" / "worktrees" / ticket
if wt_dir.exists():
subprocess.run(
["git", "worktree", "remove", str(wt_dir), "--force"],
cwd=str(workspace_root), check=True,
)
logger.info(f"Removed worktree: {wt_dir}")
else:
subprocess.run(
["git", "worktree", "prune"],
cwd=str(workspace_root), capture_output=True,
)
logger.info(f"Worktree not found: {wt_dir}")derive_project_root function · python · L27-L68 (42 LOC)repo_tools/_bootstrap.py
def derive_project_root(framework_dir: Path) -> Path:
"""Derive the consumer project root from the framework directory.
Logic (also implemented in bootstrap.sh and bootstrap.ps1):
1. ``git rev-parse --show-toplevel`` — if different from *framework_dir*,
the framework is nested inside a larger repo (monorepo / symlink in CI)
and the toplevel IS the project root.
2. If the toplevel equals *framework_dir*, the framework is a submodule
whose own root is the git toplevel. Use
``git rev-parse --show-superproject-working-tree`` to find the parent.
3. If neither works, raise ``RuntimeError`` — the caller should ask
the user for an explicit root.
"""
framework_dir = Path(os.path.normpath(framework_dir))
r = subprocess.run(
["git", "-C", str(framework_dir), "rev-parse", "--show-toplevel"],
capture_output=True, text=True,
)
if r.returncode != 0 or not r.stdout.strip():
raise RuntimeError(
find_uv function · python · L71-L79 (9 LOC)repo_tools/_bootstrap.py
def find_uv(workspace_root: Path) -> str | None:
"""Locate the uv executable: _tools/bin first, then PATH."""
suffix = ".exe" if sys.platform == "win32" else ""
tools_bin = workspace_root / "_tools" / "bin" / f"uv{suffix}"
if tools_bin.exists():
return str(tools_bin)
return shutil.which("uv")load_framework_pyproject function · python · L82-L88 (7 LOC)repo_tools/_bootstrap.py
def load_framework_pyproject(framework_root: Path) -> dict:
"""Load and parse the framework pyproject.toml."""
path = framework_root / "pyproject.toml"
if not path.exists():
print(f"ERROR: {path} not found", file=sys.stderr)
sys.exit(1)
return tomllib.loads(path.read_text(encoding="utf-8"))collect_feature_groups function · python · L91-L109 (19 LOC)repo_tools/_bootstrap.py
def collect_feature_groups(
framework_data: dict, features: list[str],
) -> dict[str, list[str]]:
"""Return the subset of [dependency-groups] matching *features*.
If *features* is empty, returns all groups.
"""
all_groups: dict[str, list[str]] = framework_data.get("dependency-groups", {})
if not features:
return dict(all_groups)
selected: dict[str, list[str]] = {}
for name in features:
if name in all_groups:
selected[name] = list(all_groups[name])
else:
print(f"WARNING: unknown feature '{name}'", file=sys.stderr)
return selectedwrite_pyproject function · python · L131-L148 (18 LOC)repo_tools/_bootstrap.py
def write_pyproject(
path: Path,
groups: dict[str, list[str]],
) -> None:
"""Write the generated tools/pyproject.toml."""
def _q(items: list[str]) -> str:
return ", ".join(f'"{d}"' for d in items)
content = _PYPROJECT_TEMPLATE.format(
groups="\n".join(
f"{name} = [{_q(deps)}]"
for name, deps in sorted(groups.items())
),
default_groups=_q(sorted(groups)),
)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
uv_sync function · python · L151-L158 (8 LOC)repo_tools/_bootstrap.py
def uv_sync(uv: str, project_dir: Path, venv_dir: Path) -> None:
"""Run uv sync with UV_PROJECT_ENVIRONMENT pointing to the venv."""
env = {**os.environ, "UV_PROJECT_ENVIRONMENT": str(venv_dir)}
cmd = [uv, "sync", "--project", str(project_dir)]
result = subprocess.run(cmd, env=env)
if result.returncode != 0:
print("ERROR: uv sync failed", file=sys.stderr)
sys.exit(1)run function · python · L161-L194 (34 LOC)repo_tools/_bootstrap.py
def run(
framework_root: Path,
workspace_root: Path,
uv: str | None = None,
features: list[str] | None = None,
tool_deps: list[str] | None = None,
) -> None:
"""Generate tools/pyproject.toml and run uv sync.
*features*: list of enabled features, or None/empty for all.
*tool_deps*: deps declared by registered tools via ``RepoTool.deps``.
"""
if uv is None:
uv = find_uv(workspace_root)
if uv is None:
print("ERROR: uv not found", file=sys.stderr)
sys.exit(1)
fw_data = load_framework_pyproject(framework_root)
core_deps = list(fw_data.get("project", {}).get("dependencies", []))
groups: dict[str, list[str]] = {"core": core_deps}
groups.update(collect_feature_groups(fw_data, features or []))
if tool_deps:
groups["tools"] = tool_deps
tools_pyproject = workspace_root / "tools" / "pyproject.toml"
write_pyproject(tools_pyproject, groups)
print(f"Generated {tools_pyproject}")
venv_dwrite_shims function · python · L197-L238 (42 LOC)repo_tools/_bootstrap.py
def write_shims(
framework_root: Path,
workspace_root: Path,
) -> None:
"""Generate ./repo shims (bash + Windows cmd).
Called during bootstrap after the venv exists. Uses sys.executable
to locate the venv Python — the caller must be running inside the venv.
"""
py = Path(sys.executable)
venv_bin = py.parent
def _posix(p: Path) -> str:
return str(p).replace("\\", "/")
# bash shim (Linux/macOS, Git Bash on Windows)
bash_shim = workspace_root / "repo"
bash_shim.write_text(
f"#!/bin/bash\n"
f'export PATH="{_posix(venv_bin)}:$PATH"\n'
f'PYTHONPATH="{_posix(framework_root)}" '
f'exec "{_posix(py)}" -m repo_tools.cli '
f'--workspace-root "{_posix(workspace_root)}" "$@"\n',
encoding="utf-8",
newline="\n",
)
try:
bash_shim.chmod(0o755)
except OSError:
pass
if sys.platform == "win32":
cmd_shim = workspace_root / "repo.cmd"
cmd_shim.w_discover_tools_from_path function · python · L28-L72 (45 LOC)repo_tools/cli.py
def _discover_tools_from_path(
namespace_path: list[str],
package_name: str,
) -> list[RepoTool]:
"""Discover RepoTool subclasses from namespace package portions."""
tools: list[RepoTool] = []
for module_info in pkgutil.iter_modules(namespace_path):
name = module_info.name
if name.startswith("_") or name in ("cli", "core", "command_runner"):
continue
try:
module = importlib.import_module(f"{package_name}.{name}")
except ImportError as exc:
logger.debug(f"Could not import {package_name}.{name}: {exc}")
continue
except Exception as exc:
logger.warning(f"Failed to import {package_name}.{name}: {exc}")
continue
for _, cls in inspect.getmembers(module, inspect.isclass):
if cls is RepoTool or not issubclass(cls, RepoTool):
continue
# For regular modules, class must be defined there
# For packages, clas_resolve_tools function · python · L75-L80 (6 LOC)repo_tools/cli.py
def _resolve_tools(framework_tools: list[RepoTool], project_tools: list[RepoTool]) -> list[RepoTool]:
"""Merge framework and project tools; project wins on name collision."""
by_name: dict[str, RepoTool] = {t.name: t for t in framework_tools}
for t in project_tools:
by_name[t.name] = t # project wins
return sorted(by_name.values(), key=lambda t: t.name)_auto_register_config_tools function · python · L83-L117 (35 LOC)repo_tools/cli.py
def _auto_register_config_tools(
config: dict[str, Any],
registered_names: set[str],
) -> list[RepoTool]:
"""Create CommandRunnerTools for eligible config sections.
A section is eligible when it contains a ``steps`` or ``steps@*`` key.
"""
from .command_runner import CommandRunnerTool
_skip_sections = {"tokens", "repo"}
auto_tools: list[RepoTool] = []
for section_name, section_value in config.items():
if section_name in _skip_sections:
continue
if section_name in registered_names:
logger.debug(
f"[auto-tool] '{section_name}': skipped — a RepoTool subclass is already registered."
)
continue
if not isinstance(section_value, dict):
continue
has_steps = any(k.split("@", 1)[0] == "steps" for k in section_value)
if not has_steps:
continue
tool = CommandRunnerTool()
tool.name = section_name # type: ignore[assi_get_dimension_tokens function · python · L123-L132 (10 LOC)repo_tools/cli.py
def _get_dimension_tokens(config: dict[str, Any]) -> dict[str, list[str]]:
"""Extract dimension tokens (lists) from config.repo.tokens."""
repo_section = config.get("repo", {})
if not isinstance(repo_section, dict):
repo_section = {}
dims: dict[str, list[str]] = {}
for key, value in repo_section.get("tokens", {}).items():
if isinstance(value, list) and value:
dims[key] = [str(v) for v in value]
return dims_build_tool_context function · python · L138-L151 (14 LOC)repo_tools/cli.py
def _build_tool_context(ctx_obj: dict[str, Any], tool_name: str) -> ToolContext:
"""Build a ToolContext from the click context obj dict."""
config = ctx_obj["config"]
tool_config = config.get(tool_name, {})
if not isinstance(tool_config, dict):
tool_config = {}
return ToolContext(
workspace_root=Path(ctx_obj["workspace_root"]),
tokens=ctx_obj["tokens"],
config=config,
tool_config=tool_config,
dimensions=ctx_obj["dimensions"],
passthrough_args=[],
)If a scraper extracted this row, it came from Repobility (https://repobility.com)
_make_tool_command function · python · L154-L219 (66 LOC)repo_tools/cli.py
def _make_tool_command(
tool: RepoTool,
dimensions: dict[str, list[str]],
) -> click.Command:
"""Build a click command for a tool."""
@click.pass_context
def callback(ctx: click.Context, **kwargs: Any) -> None:
# Extract dimension overrides from subcommand-level flags
dim_overrides: dict[str, str] = {}
for dim_name in dimensions:
cli_key = dim_name.replace("-", "_")
val = kwargs.pop(cli_key, None)
if val is not None:
dim_overrides[dim_name] = val
if dim_overrides:
updated_dims = {**ctx.obj["dimensions"], **dim_overrides}
config = resolve_filters(load_config(ctx.obj["workspace_root"]), updated_dims)
tokens = resolve_tokens(ctx.obj["workspace_root"], config, updated_dims)
ctx.obj["dimensions"] = updated_dims
ctx.obj["config"] = config
ctx.obj["tokens"] = tokens
context = _build_tool_context(ctx.obj, tool_build_cli function · python · L225-L352 (128 LOC)repo_tools/cli.py
def _build_cli(
workspace_root: str | None = None,
project_tool_dirs: list[str] | None = None,
) -> click.Group:
"""Build the top-level click group with all discovered tools."""
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.option(
"--workspace-root",
type=click.Path(exists=True),
default=workspace_root,
hidden=True, # Set by the ./repo shim; consumers never pass this directly.
)
@click.pass_context
def cli(ctx: click.Context, workspace_root: str, **dim_kwargs: Any) -> None:
ctx.ensure_object(dict)
if workspace_root is None:
workspace_root = str(Path.cwd())
# Reuse early config if workspace hasn't changed
if workspace_root == ws:
config = early_config
else:
config = load_config(workspace_root)
# Resolve dimension values from config tokens or fallback defaults
dimensions = _get_dimension_tokens(comain function · python · L355-L393 (39 LOC)repo_tools/cli.py
def main() -> None:
"""CLI entry point. Called by the generated ``./repo`` shim.
The shim sets PYTHONPATH to include the framework directory and runs::
python -m repo_tools.cli --workspace-root /path/to/project ...
"""
from colorama import init as colorama_init
colorama_init()
# --workspace-root can come from argv or be derived from this script's location
workspace_root = None
# Check for --workspace-root in sys.argv before click parses.
# Take the last occurrence so user overrides beat the shim's hardcoded value.
for i, arg in enumerate(sys.argv[1:], 1):
if arg == "--workspace-root" and i < len(sys.argv) - 1:
workspace_root = sys.argv[i + 1]
elif arg.startswith("--workspace-root="):
workspace_root = arg.split("=", 1)[1]
# Fallback: cwd (workspace_root stays None → defaults to cwd in _build_cli)
# Discover project tool dirs
project_tool_dirs: list[str] = []
if workspace_root:
_validate_steps function · python · L17-L63 (47 LOC)repo_tools/command_runner.py
def _validate_steps(section_name: str, raw: Any) -> list[dict]:
"""Validate and normalize a ``steps`` value into a list of step dicts.
Each item is either a string (shorthand for ``{"command": str}``) or a dict
with ``command`` (required), plus optional ``cwd``, ``env_script``, ``env``.
Raises ``SystemExit(1)`` on validation failure.
"""
if not isinstance(raw, list):
print(f"Error: '{section_name}' steps must be a list, got {type(raw).__name__}", file=sys.stderr)
raise SystemExit(1)
result: list[dict] = []
for i, item in enumerate(raw):
if isinstance(item, str):
result.append({"command": item})
elif isinstance(item, dict):
if "command" not in item:
print(
f"Error: '{section_name}' step [{i}] missing required 'command' key",
file=sys.stderr,
)
raise SystemExit(1)
unknown = set(item) - _STEP_KEYS
_parse_env_list function · python · L66-L78 (13 LOC)repo_tools/command_runner.py
def _parse_env_list(entries: list[str]) -> dict[str, str]:
"""Convert ``["KEY=VALUE", ...]`` to a dict.
Raises ``SystemExit(1)`` if any entry is missing ``=``.
"""
result: dict[str, str] = {}
for entry in entries:
if "=" not in entry:
print(f"Error: env entry {entry!r} missing '='", file=sys.stderr)
raise SystemExit(1)
key, value = entry.split("=", 1)
result[key] = value
return resultCommandRunnerTool.execute method · python · L93-L151 (59 LOC)repo_tools/command_runner.py
def execute(self, ctx: ToolContext, args: dict[str, Any]) -> None:
raw_steps = args.get("steps")
if not raw_steps:
logger.error(
f"No {self.name!r} steps configured. Add to config.yaml:\n"
f" {self.config_hint}"
)
raise SystemExit(1)
tokens = dict(ctx.tokens)
# Merge remaining args as extra tokens (skip step-framework keys).
_skip = {"steps", "dry_run"}
for k, v in args.items():
if k not in _skip and v is not None:
tokens[k] = str(v)
formatter = TokenFormatter(tokens)
steps = _validate_steps(self.name, raw_steps)
# Resolve each step through token expansion.
resolved: list[dict] = []
for step in steps:
r: dict[str, Any] = {}
r["command"] = formatter.resolve(step["command"])
if "cwd" in step:
r["cwd"] = Path(formatter.resolve(str(step["cwd"])))
ContextTool.execute method · python · L24-L31 (8 LOC)repo_tools/context.py
def execute(self, ctx: ToolContext, args: dict[str, Any]) -> None:
tokens = dict(sorted(ctx.tokens.items()))
if args.get("as_json"):
print(json.dumps(tokens, indent=2))
else:
for key, value in tokens.items():
logger.info(f"{key}: {value}")_level_color function · python · L32-L37 (6 LOC)repo_tools/core.py
def _level_color(levelno: int) -> str:
if levelno >= logging.ERROR:
return Fore.RED
if levelno >= logging.WARNING:
return Fore.YELLOW
return Fore.CYANProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
TokenFormatter.resolve method · python · L72-L87 (16 LOC)repo_tools/core.py
def resolve(self, template: str) -> str:
result = template
for _ in range(self.MAX_DEPTH):
try:
expanded = result.format_map(self._tokens)
except KeyError as exc:
missing = exc.args[0] if exc.args else "unknown"
raise KeyError(f"Missing token: {missing}") from exc
if expanded == result:
return expanded
result = expanded
remaining = _extract_references(result)
raise ValueError(
f"Token expansion exceeded {self.MAX_DEPTH} iterations"
f" (unresolved: {', '.join(sorted(remaining))})"
)_builtin_tokens function · python · L96-L109 (14 LOC)repo_tools/core.py
def _builtin_tokens() -> dict[str, str]:
system = platform.system()
is_win = system == "Windows"
is_mac = system == "Darwin"
# Framework root: parent of the repo_tools package (the submodule dir).
framework_root = posix_path(str(Path(__file__).resolve().parent.parent))
return {
"exe_ext": ".exe" if is_win else "",
"shell_ext": ".cmd" if is_win else ".sh",
"lib_ext": ".dll" if is_win else (".dylib" if is_mac else ".so"),
"path_sep": ";" if is_win else ":",
"repo": f'"{posix_path(sys.executable)}" -m repo_tools.cli --workspace-root "{{workspace_root}}"',
"framework_root": framework_root,
}_extract_references function · python · L116-L126 (11 LOC)repo_tools/core.py
def _extract_references(template: str) -> set[str]:
"""Return the set of token names referenced by ``{name}`` placeholders.
Uses ``string.Formatter().parse()`` which correctly ignores escaped
braces (``{{``/``}}``), returning ``field_name=None`` for those.
"""
refs: set[str] = set()
for _, field_name, _, _ in string.Formatter().parse(template):
if field_name is not None:
refs.add(field_name)
return refs_validate_token_graph function · python · L129-L164 (36 LOC)repo_tools/core.py
def _validate_token_graph(tokens: dict[str, str]) -> None:
"""Validate the token dependency graph before expansion.
Raises:
ValueError: on self-references or cycles (with the cycle path).
KeyError: when a token references an undefined token.
"""
# Build dependency graph: token -> set of tokens it depends on
graph: dict[str, set[str]] = {}
for name, value in tokens.items():
refs = _extract_references(str(value))
graph[name] = refs
# Self-reference check (clear message before TopologicalSorter)
if name in refs:
raise ValueError(f"Token '{name}' references itself")
# Missing reference check
all_names = set(tokens)
for name, refs in graph.items():
missing = refs - all_names
if missing:
raise KeyError(
f"Token '{name}' references undefined token(s): "
+ ", ".join(sorted(missing))
)
# Cycle detection via topological soresolve_tokens function · python · L167-L222 (56 LOC)repo_tools/core.py
def resolve_tokens(
workspace_root: str,
config: dict[str, Any],
dimension_values: dict[str, str],
) -> dict[str, str]:
"""Build the full token dictionary.
Merge order (later wins):
1. Built-in tokens (exe_ext, shell_ext, repo, etc.)
2. Variable tokens from config
3. workspace_root path token
4. Dimension values (platform, build_type, etc.)
"""
tokens: dict[str, str] = _builtin_tokens()
# Variable tokens from config (repo.tokens section)
repo_section = config.get("repo", {})
if not isinstance(repo_section, dict):
repo_section = {}
for key, value in repo_section.get("tokens", {}).items():
if isinstance(value, list):
continue # dimension tokens handled elsewhere
if key in _RESERVED_TOKENS:
logger.warning(f"'{key}' is a reserved token and cannot be overridden in config.")
continue
if isinstance(value, dict):
raw = str(value.get("value", "")load_config function · python · L228-L238 (11 LOC)repo_tools/core.py
def load_config(workspace_root: str) -> dict[str, Any]:
"""Load config.yaml from workspace root."""
config_path = Path(workspace_root) / "config.yaml"
if not config_path.exists():
return {}
data = yaml.safe_load(config_path.read_text(encoding="utf-8"))
if data is None:
return {}
if not isinstance(data, dict):
raise TypeError("config.yaml must contain a top-level mapping.")
return dataresolve_filters function · python · L242-L258 (17 LOC)repo_tools/core.py
def resolve_filters(config: dict[str, Any], dimension_values: dict[str, str]) -> dict[str, Any]:
"""Walk config dict, resolve ``key@filter`` entries.
Filter syntax:
- ``@value`` — matches any dimension whose current value equals *value*
- ``@val1,val2`` — AND across different dimensions
- ``@!value`` — negation
- ``@val1,!val2`` — compound
More-specific filters (more conditions) win over less-specific ones.
"""
# Build reverse lookup: value -> dimension name
dim_lookup: dict[str, str] = {}
for dim_name, dim_val in dimension_values.items():
dim_lookup[dim_val] = dim_name
return _walk_filters(config, dimension_values, dim_lookup)_walk_filters function · python · L261-L303 (43 LOC)repo_tools/core.py
def _walk_filters(
obj: Any,
dim_values: dict[str, str],
dim_lookup: dict[str, str],
) -> Any:
if isinstance(obj, dict):
# Collect base keys and filtered keys
base: dict[str, Any] = {}
filtered: dict[str, list[tuple[str, int, Any]]] = {} # base_key -> [(filter, specificity, value)]
for key, value in obj.items():
if "@" in str(key):
parts = str(key).split("@", 1)
base_key = parts[0]
filter_str = parts[1]
match, specificity = _match_filter(filter_str, dim_values, dim_lookup)
if match:
filtered.setdefault(base_key, []).append((filter_str, specificity, value))
else:
base[key] = value
# Resolve: most-specific filter wins over base
result: dict[str, Any] = {}
for key, value in base.items():
if key in filtered:
# Pick most specific
candidRepobility · MCP-ready · https://repobility.com
_match_filter function · python · L306-L346 (41 LOC)repo_tools/core.py
def _match_filter(
filter_str: str,
dim_values: dict[str, str],
dim_lookup: dict[str, str],
) -> tuple[bool, int]:
"""Check if a filter matches the current dimension values.
Returns ``(matches, specificity)`` where specificity = number of conditions.
"""
conditions = [c.strip() for c in filter_str.split(",") if c.strip()]
if not conditions:
return True, 0
for cond in conditions:
negate = cond.startswith("!")
value = cond.lstrip("!")
# Find which dimension this value belongs to
matched_any = False
for dim_name, dim_val in dim_values.items():
if value == dim_val or value == dim_name:
matched_any = True
if negate:
if value == dim_val:
return False, 0 # Negation failed
break
# Also check if value is a known dimension value (not current)
if not matched_any:
if value in dim_registered_tool_deps function · python · L411-L416 (6 LOC)repo_tools/core.py
def registered_tool_deps() -> list[str]:
"""Collect, deduplicate, and sort deps from all registered tools."""
seen: set[str] = set()
for tool in _TOOL_REGISTRY.values():
seen.update(tool.deps)
return sorted(seen)invoke_tool function · python · L419-L449 (31 LOC)repo_tools/core.py
def invoke_tool(
name: str,
tokens: dict[str, str],
config: dict[str, Any],
dimensions: dict[str, str] | None = None,
extra_args: dict[str, Any] | None = None,
) -> None:
"""Invoke a registered tool programmatically (e.g. prebuild/postbuild steps)."""
tool = get_tool(name)
if tool is None:
raise KeyError(f"Tool '{name}' is not registered.")
tool_config = config.get(name, {})
if not isinstance(tool_config, dict):
tool_config = {}
ctx = ToolContext(
workspace_root=Path(tokens.get("workspace_root", ".")),
tokens=tokens,
config=config,
tool_config=tool_config,
dimensions=dimensions or {},
passthrough_args=[],
)
args: dict[str, Any] = {**tool.default_args(tokens)}
args.update(tool_config)
if extra_args:
args.update(extra_args)
tool.execute(ctx, args)detect_platform_identifier function · python · L455-L493 (39 LOC)repo_tools/core.py
def detect_platform_identifier(
platform_override: str | None = None,
conan_profile_path: Path | None = None,
) -> str:
"""Detect platform identifier for build directory structure.
Priority: 1. Explicit override 2. Conan profile 3. Host auto-detect
"""
if platform_override:
return platform_override
if conan_profile_path and conan_profile_path.exists():
try:
profile_content = conan_profile_path.read_text()
os_match = re.search(r"^os=(\w+)", profile_content, re.MULTILINE)
arch_match = re.search(r"^arch=(\w+)", profile_content, re.MULTILINE)
if os_match and arch_match:
return _map_platform_identifier(os_match.group(1), arch_match.group(1))
except (OSError, UnicodeDecodeError):
pass
system = platform.system()
machine = platform.machine().lower()
if machine in ("x86_64", "amd64"):
arch = "x64"
elif machine in ("arm64", "aarch64", "armv_map_platform_identifier function · python · L496-L518 (23 LOC)repo_tools/core.py
def _map_platform_identifier(os_val: str, arch_val: str) -> str:
"""Map Conan os/arch settings to platform identifier."""
if os_val == "Emscripten" and arch_val == "wasm":
return "emscripten"
os_map = {
"Windows": "windows",
"Linux": "linux",
"Macos": "macos",
"Darwin": "macos",
}
os_normalized = os_map.get(os_val, os_val.lower())
arch_map = {
"x86_64": "x64",
"x86": "x86",
"armv8": "arm64",
"armv8_32": "arm",
"wasm": "wasm",
}
arch_normalized = arch_map.get(arch_val, arch_val.lower())
return f"{os_normalized}-{arch_normalized}"log_section function · python · L534-L544 (11 LOC)repo_tools/core.py
def log_section(title: str) -> Generator[None, None, None]:
"""Foldable CI section or styled terminal header."""
if _is_ci():
print(f"::group::{title}", flush=True)
else:
logger.info(f"── {title} ──")
try:
yield
finally:
if _is_ci():
print("::endgroup::", flush=True)find_venv_executable function · python · L553-L567 (15 LOC)repo_tools/core.py
def find_venv_executable(name: str) -> str:
"""Find an executable in the virtual environment, fallback to system PATH."""
python_exe = Path(sys.executable)
scripts_dir = python_exe.parent
exe_path = scripts_dir / (name + (".exe" if sys.platform == "win32" else ""))
if exe_path.exists():
return str(exe_path)
exe_path_str = shutil.which(name)
if exe_path_str:
return exe_path_str
logger.warning(f"Executable {name} not found in virtual environment or system PATH")
return namesanitized_subprocess_env function · python · L570-L603 (34 LOC)repo_tools/core.py
def sanitized_subprocess_env() -> dict[str, str]:
"""Return env overrides that strip repo-tool Python contamination.
The generated shim (``repo`` / ``repo.cmd``) prepends the venv's Scripts
directory to ``PATH`` and sets ``PYTHONPATH`` so the CLI can import
``repo_tools``. These variables must **not** leak into build-tool
subprocesses (Conan, CMake, …) because they can cause the wrong Python
stdlib to be loaded — for example, a system Python 3.12 picking up the
venv's Python 3.14 stdlib, resulting in ``SRE module mismatch`` or
``_thread`` attribute errors.
Returns a dict suitable for the *env* parameter of :func:`run_command`
or :class:`CommandGroup`. The dict is merged **on top of**
``os.environ``, so only the keys that need overriding are present.
"""
env: dict[str, str] = {}
# Strip PYTHONPATH — only needed for repo_tools imports
env["PYTHONPATH"] = ""
# Strip PYTHONHOME if present
if "PYTHONHOME" in os.enviroGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
run_command function · python · L606-L662 (57 LOC)repo_tools/core.py
def run_command(
cmd: list[str],
log_file: Path | None = None,
env_script: Path | None = None,
cwd: Path | None = None,
env: dict[str, str] | None = None,
) -> None:
"""Run a command and optionally tee output to a log file.
If *env_script* is provided, the command is executed inside a shell
that sources the script first. Errors out when the script doesn't exist.
"""
use_shell = False
run_cmd: list[str] | str = cmd
if env_script is not None:
script = env_script
if not script.suffix:
script = script.with_suffix(".bat" if is_windows() else ".sh")
if not script.exists():
logger.error(f"env_script not found: {script}")
sys.exit(1)
if is_windows():
cmd_str = subprocess.list2cmdline(cmd)
run_cmd = f'call "{script}" >nul 2>&1 && {cmd_str}'
else:
cmd_str = shlex.join(cmd)
run_cmd = f'. "{script}" >/dev/null 2>&1 && {cmd_strremove_tree_with_retries function · python · L665-L681 (17 LOC)repo_tools/core.py
def remove_tree_with_retries(
path: Path, attempts: int = 5, delay: float = 1.0,
) -> None:
"""Remove a directory tree with retry logic for locked files (Windows)."""
for attempt in range(attempts):
try:
shutil.rmtree(path)
return
except PermissionError:
if attempt < attempts - 1:
logger.warning(
f"Permission denied removing {path}, "
f"retrying in {delay}s ({attempt + 1}/{attempts})"
)
time.sleep(delay)
else:
raiseresolve_path function · python · L684-L691 (8 LOC)repo_tools/core.py
def resolve_path(root: Path, template: str, tokens: dict[str, str]) -> Path:
"""Resolve a path template using tokens."""
formatter = TokenFormatter(tokens)
resolved = formatter.resolve(template)
path = Path(resolved)
if not path.is_absolute():
path = root / path
return pathglob_paths function · python · L697-L705 (9 LOC)repo_tools/core.py
def glob_paths(pattern: Path | str) -> list[Path]:
"""Expand a glob pattern to a sorted list of matching file paths.
Returns a single-element list for non-glob paths.
"""
pattern_text = str(pattern)
if any(char in pattern_text for char in ("*", "?", "[")):
return sorted(Path(match) for match in glob.glob(pattern_text, recursive=True))
return [Path(pattern_text)]CommandGroup.__init__ method · python · L728-L742 (15 LOC)repo_tools/core.py
def __init__(
self,
label: str,
log_file: Path | None = None,
env_script: Path | None = None,
cwd: Path | None = None,
env: dict[str, str] | None = None,
) -> None:
self.label = label
self.log_file = log_file
self.env_script = env_script
self.cwd = cwd
self.env = env
self._commands_run = 0
self._failed = FalseCommandGroup.__enter__ method · python · L744-L749 (6 LOC)repo_tools/core.py
def __enter__(self) -> CommandGroup:
if _is_ci():
print(f"::group::{self.label}", flush=True)
else:
logger.info(f"── {self.label} ──")
return selfCommandGroup.__exit__ method · python · L751-L759 (9 LOC)repo_tools/core.py
def __exit__(self, exc_type: type | None, exc_val: BaseException | None, exc_tb: Any) -> None:
if _is_ci():
print("::endgroup::", flush=True)
if exc_type is not None:
return # let the exception propagate
if self._failed:
logger.error(f" ✗ {self.label} failed")
else:
logger.info(f" ✓ {self.label} ({self._commands_run} command(s))")CommandGroup.run method · python · L761-L783 (23 LOC)repo_tools/core.py
def run(
self,
cmd: list[str],
log_file: Path | None = None,
env_script: Path | None = None,
cwd: Path | None = None,
env: dict[str, str] | None = None,
) -> None:
"""Run a command within this group.
Per-call *log_file*, *env_script*, *cwd*, and *env* override the
group defaults. Per-call *env* is merged on top of group-level env.
"""
lf = log_file or self.log_file
es = env_script or self.env_script
cw = cwd or self.cwd
merged_env = {**(self.env or {}), **(env or {})} or None
try:
run_command(cmd, log_file=lf, env_script=es, cwd=cw, env=merged_env)
self._commands_run += 1
except SystemExit:
self._failed = True
raiseIf a scraper extracted this row, it came from Repobility (https://repobility.com)
to_cmake_build_type function · python · L789-L798 (10 LOC)repo_tools/core.py
def to_cmake_build_type(value: str | None) -> str:
if not value:
return "Debug"
mapping = {
"debug": "Debug",
"release": "Release",
"relwithdebinfo": "RelWithDebInfo",
"minsizerel": "MinSizeRel",
}
return mapping.get(str(value).casefold(), str(value))find_executable function · python · L17-L27 (11 LOC)repo_tools/features.py
def find_executable(name: str) -> str | None:
"""Find *name* in the venv Scripts dir or system PATH.
Returns the path string, or ``None`` if not found.
"""
scripts_dir = Path(sys.executable).parent
suffix = ".exe" if sys.platform == "win32" else ""
exe_path = scripts_dir / (name + suffix)
if exe_path.exists():
return str(exe_path)
return shutil.which(name)require_executable function · python · L30-L49 (20 LOC)repo_tools/features.py
def require_executable(name: str, *, feature: str) -> str:
"""Find *name* or exit with a helpful error about which feature to enable.
Returns the executable path on success. On failure calls
``sys.exit(1)`` after logging a message that tells the user
which *feature* group to add to ``repo.features`` in config.yaml.
"""
exe = find_executable(name)
if exe is not None:
return exe
from .core import logger # lazy — keep module-level stdlib-only
logger.error(
"%s not found. Enable the '%s' feature in config.yaml "
"(repo.features) and run 'repo init' to install it.",
name,
feature,
)
sys.exit(1)