← back to dw218192__repokit

Function bodies 119 total

All specs Real LLM only Function bodies
ensure_worktree function · python · L15-L47 (33 LOC)
repo_tools/agent/worktree.py
def ensure_worktree(workspace_root: Path, ticket: str) -> Path:
    """Create or reuse a git worktree for the given ticket under _agent/worktrees/."""
    wt_dir = workspace_root / "_agent" / "worktrees" / ticket
    branch_name = f"worktree-{ticket}"

    if wt_dir.exists():
        logger.info(f"Reusing existing worktree: {wt_dir}")
        return wt_dir

    wt_dir.parent.mkdir(parents=True, exist_ok=True)
    subprocess.run(
        ["git", "worktree", "prune"],
        cwd=str(workspace_root), capture_output=True,
    )

    branch_exists = subprocess.run(
        ["git", "rev-parse", "--verify", branch_name],
        cwd=str(workspace_root), capture_output=True,
    ).returncode == 0

    if branch_exists:
        subprocess.run(
            ["git", "worktree", "add", str(wt_dir), branch_name],
            cwd=str(workspace_root), check=True,
        )
    else:
        subprocess.run(
            ["git", "worktree", "add", "-b", branch_name, str(wt_dir), "HEAD"],
            cwd
remove_worktree function · python · L50-L64 (15 LOC)
repo_tools/agent/worktree.py
def remove_worktree(workspace_root: Path, ticket: str) -> None:
    """Remove the git worktree for a ticket."""
    wt_dir = workspace_root / "_agent" / "worktrees" / ticket
    if wt_dir.exists():
        subprocess.run(
            ["git", "worktree", "remove", str(wt_dir), "--force"],
            cwd=str(workspace_root), check=True,
        )
        logger.info(f"Removed worktree: {wt_dir}")
    else:
        subprocess.run(
            ["git", "worktree", "prune"],
            cwd=str(workspace_root), capture_output=True,
        )
        logger.info(f"Worktree not found: {wt_dir}")
derive_project_root function · python · L27-L68 (42 LOC)
repo_tools/_bootstrap.py
def derive_project_root(framework_dir: Path) -> Path:
    """Derive the consumer project root from the framework directory.

    Logic (also implemented in bootstrap.sh and bootstrap.ps1):

    1. ``git rev-parse --show-toplevel`` — if different from *framework_dir*,
       the framework is nested inside a larger repo (monorepo / symlink in CI)
       and the toplevel IS the project root.
    2. If the toplevel equals *framework_dir*, the framework is a submodule
       whose own root is the git toplevel.  Use
       ``git rev-parse --show-superproject-working-tree`` to find the parent.
    3. If neither works, raise ``RuntimeError`` — the caller should ask
       the user for an explicit root.
    """
    framework_dir = Path(os.path.normpath(framework_dir))

    r = subprocess.run(
        ["git", "-C", str(framework_dir), "rev-parse", "--show-toplevel"],
        capture_output=True, text=True,
    )
    if r.returncode != 0 or not r.stdout.strip():
        raise RuntimeError(
      
find_uv function · python · L71-L79 (9 LOC)
repo_tools/_bootstrap.py
def find_uv(workspace_root: Path) -> str | None:
    """Locate the uv executable: _tools/bin first, then PATH."""
    suffix = ".exe" if sys.platform == "win32" else ""

    tools_bin = workspace_root / "_tools" / "bin" / f"uv{suffix}"
    if tools_bin.exists():
        return str(tools_bin)

    return shutil.which("uv")
load_framework_pyproject function · python · L82-L88 (7 LOC)
repo_tools/_bootstrap.py
def load_framework_pyproject(framework_root: Path) -> dict:
    """Load and parse the framework pyproject.toml."""
    path = framework_root / "pyproject.toml"
    if not path.exists():
        print(f"ERROR: {path} not found", file=sys.stderr)
        sys.exit(1)
    return tomllib.loads(path.read_text(encoding="utf-8"))
collect_feature_groups function · python · L91-L109 (19 LOC)
repo_tools/_bootstrap.py
def collect_feature_groups(
    framework_data: dict, features: list[str],
) -> dict[str, list[str]]:
    """Return the subset of [dependency-groups] matching *features*.

    If *features* is empty, returns all groups.
    """
    all_groups: dict[str, list[str]] = framework_data.get("dependency-groups", {})

    if not features:
        return dict(all_groups)

    selected: dict[str, list[str]] = {}
    for name in features:
        if name in all_groups:
            selected[name] = list(all_groups[name])
        else:
            print(f"WARNING: unknown feature '{name}'", file=sys.stderr)
    return selected
write_pyproject function · python · L131-L148 (18 LOC)
repo_tools/_bootstrap.py
def write_pyproject(
    path: Path,
    groups: dict[str, list[str]],
) -> None:
    """Write the generated tools/pyproject.toml."""
    def _q(items: list[str]) -> str:
        return ", ".join(f'"{d}"' for d in items)

    content = _PYPROJECT_TEMPLATE.format(
        groups="\n".join(
            f"{name} = [{_q(deps)}]"
            for name, deps in sorted(groups.items())
        ),
        default_groups=_q(sorted(groups)),
    )

    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
uv_sync function · python · L151-L158 (8 LOC)
repo_tools/_bootstrap.py
def uv_sync(uv: str, project_dir: Path, venv_dir: Path) -> None:
    """Run uv sync with UV_PROJECT_ENVIRONMENT pointing to the venv."""
    env = {**os.environ, "UV_PROJECT_ENVIRONMENT": str(venv_dir)}
    cmd = [uv, "sync", "--project", str(project_dir)]
    result = subprocess.run(cmd, env=env)
    if result.returncode != 0:
        print("ERROR: uv sync failed", file=sys.stderr)
        sys.exit(1)
run function · python · L161-L194 (34 LOC)
repo_tools/_bootstrap.py
def run(
    framework_root: Path,
    workspace_root: Path,
    uv: str | None = None,
    features: list[str] | None = None,
    tool_deps: list[str] | None = None,
) -> None:
    """Generate tools/pyproject.toml and run uv sync.

    *features*: list of enabled features, or None/empty for all.
    *tool_deps*: deps declared by registered tools via ``RepoTool.deps``.
    """
    if uv is None:
        uv = find_uv(workspace_root)
    if uv is None:
        print("ERROR: uv not found", file=sys.stderr)
        sys.exit(1)

    fw_data = load_framework_pyproject(framework_root)
    core_deps = list(fw_data.get("project", {}).get("dependencies", []))
    groups: dict[str, list[str]] = {"core": core_deps}
    groups.update(collect_feature_groups(fw_data, features or []))
    if tool_deps:
        groups["tools"] = tool_deps

    tools_pyproject = workspace_root / "tools" / "pyproject.toml"

    write_pyproject(tools_pyproject, groups)
    print(f"Generated {tools_pyproject}")

    venv_d
write_shims function · python · L197-L238 (42 LOC)
repo_tools/_bootstrap.py
def write_shims(
    framework_root: Path,
    workspace_root: Path,
) -> None:
    """Generate ./repo shims (bash + Windows cmd).

    Called during bootstrap after the venv exists.  Uses sys.executable
    to locate the venv Python — the caller must be running inside the venv.
    """
    py = Path(sys.executable)
    venv_bin = py.parent

    def _posix(p: Path) -> str:
        return str(p).replace("\\", "/")

    # bash shim (Linux/macOS, Git Bash on Windows)
    bash_shim = workspace_root / "repo"
    bash_shim.write_text(
        f"#!/bin/bash\n"
        f'export PATH="{_posix(venv_bin)}:$PATH"\n'
        f'PYTHONPATH="{_posix(framework_root)}" '
        f'exec "{_posix(py)}" -m repo_tools.cli '
        f'--workspace-root "{_posix(workspace_root)}" "$@"\n',
        encoding="utf-8",
        newline="\n",
    )
    try:
        bash_shim.chmod(0o755)
    except OSError:
        pass

    if sys.platform == "win32":
        cmd_shim = workspace_root / "repo.cmd"
        cmd_shim.w
_discover_tools_from_path function · python · L28-L72 (45 LOC)
repo_tools/cli.py
def _discover_tools_from_path(
    namespace_path: list[str],
    package_name: str,
) -> list[RepoTool]:
    """Discover RepoTool subclasses from namespace package portions."""
    tools: list[RepoTool] = []
    for module_info in pkgutil.iter_modules(namespace_path):
        name = module_info.name
        if name.startswith("_") or name in ("cli", "core", "command_runner"):
            continue
        try:
            module = importlib.import_module(f"{package_name}.{name}")
        except ImportError as exc:
            logger.debug(f"Could not import {package_name}.{name}: {exc}")
            continue
        except Exception as exc:
            logger.warning(f"Failed to import {package_name}.{name}: {exc}")
            continue

        for _, cls in inspect.getmembers(module, inspect.isclass):
            if cls is RepoTool or not issubclass(cls, RepoTool):
                continue
            # For regular modules, class must be defined there
            # For packages, clas
_resolve_tools function · python · L75-L80 (6 LOC)
repo_tools/cli.py
def _resolve_tools(framework_tools: list[RepoTool], project_tools: list[RepoTool]) -> list[RepoTool]:
    """Merge framework and project tools; project wins on name collision."""
    by_name: dict[str, RepoTool] = {t.name: t for t in framework_tools}
    for t in project_tools:
        by_name[t.name] = t  # project wins
    return sorted(by_name.values(), key=lambda t: t.name)
_auto_register_config_tools function · python · L83-L117 (35 LOC)
repo_tools/cli.py
def _auto_register_config_tools(
    config: dict[str, Any],
    registered_names: set[str],
) -> list[RepoTool]:
    """Create CommandRunnerTools for eligible config sections.

    A section is eligible when it contains a ``steps`` or ``steps@*`` key.
    """
    from .command_runner import CommandRunnerTool

    _skip_sections = {"tokens", "repo"}
    auto_tools: list[RepoTool] = []

    for section_name, section_value in config.items():
        if section_name in _skip_sections:
            continue
        if section_name in registered_names:
            logger.debug(
                f"[auto-tool] '{section_name}': skipped — a RepoTool subclass is already registered."
            )
            continue
        if not isinstance(section_value, dict):
            continue

        has_steps = any(k.split("@", 1)[0] == "steps" for k in section_value)
        if not has_steps:
            continue

        tool = CommandRunnerTool()
        tool.name = section_name  # type: ignore[assi
_get_dimension_tokens function · python · L123-L132 (10 LOC)
repo_tools/cli.py
def _get_dimension_tokens(config: dict[str, Any]) -> dict[str, list[str]]:
    """Extract dimension tokens (lists) from config.repo.tokens."""
    repo_section = config.get("repo", {})
    if not isinstance(repo_section, dict):
        repo_section = {}
    dims: dict[str, list[str]] = {}
    for key, value in repo_section.get("tokens", {}).items():
        if isinstance(value, list) and value:
            dims[key] = [str(v) for v in value]
    return dims
_build_tool_context function · python · L138-L151 (14 LOC)
repo_tools/cli.py
def _build_tool_context(ctx_obj: dict[str, Any], tool_name: str) -> ToolContext:
    """Build a ToolContext from the click context obj dict."""
    config = ctx_obj["config"]
    tool_config = config.get(tool_name, {})
    if not isinstance(tool_config, dict):
        tool_config = {}
    return ToolContext(
        workspace_root=Path(ctx_obj["workspace_root"]),
        tokens=ctx_obj["tokens"],
        config=config,
        tool_config=tool_config,
        dimensions=ctx_obj["dimensions"],
        passthrough_args=[],
    )
If a scraper extracted this row, it came from Repobility (https://repobility.com)
_make_tool_command function · python · L154-L219 (66 LOC)
repo_tools/cli.py
def _make_tool_command(
    tool: RepoTool,
    dimensions: dict[str, list[str]],
) -> click.Command:
    """Build a click command for a tool."""

    @click.pass_context
    def callback(ctx: click.Context, **kwargs: Any) -> None:
        # Extract dimension overrides from subcommand-level flags
        dim_overrides: dict[str, str] = {}
        for dim_name in dimensions:
            cli_key = dim_name.replace("-", "_")
            val = kwargs.pop(cli_key, None)
            if val is not None:
                dim_overrides[dim_name] = val

        if dim_overrides:
            updated_dims = {**ctx.obj["dimensions"], **dim_overrides}
            config = resolve_filters(load_config(ctx.obj["workspace_root"]), updated_dims)
            tokens = resolve_tokens(ctx.obj["workspace_root"], config, updated_dims)
            ctx.obj["dimensions"] = updated_dims
            ctx.obj["config"] = config
            ctx.obj["tokens"] = tokens

        context = _build_tool_context(ctx.obj, tool
_build_cli function · python · L225-L352 (128 LOC)
repo_tools/cli.py
def _build_cli(
    workspace_root: str | None = None,
    project_tool_dirs: list[str] | None = None,
) -> click.Group:
    """Build the top-level click group with all discovered tools."""

    @click.group(context_settings={"help_option_names": ["-h", "--help"]})
    @click.option(
        "--workspace-root",
        type=click.Path(exists=True),
        default=workspace_root,
        hidden=True,  # Set by the ./repo shim; consumers never pass this directly.
    )
    @click.pass_context
    def cli(ctx: click.Context, workspace_root: str, **dim_kwargs: Any) -> None:
        ctx.ensure_object(dict)

        if workspace_root is None:
            workspace_root = str(Path.cwd())

        # Reuse early config if workspace hasn't changed
        if workspace_root == ws:
            config = early_config
        else:
            config = load_config(workspace_root)

        # Resolve dimension values from config tokens or fallback defaults
        dimensions = _get_dimension_tokens(co
main function · python · L355-L393 (39 LOC)
repo_tools/cli.py
def main() -> None:
    """CLI entry point. Called by the generated ``./repo`` shim.

    The shim sets PYTHONPATH to include the framework directory and runs::

        python -m repo_tools.cli --workspace-root /path/to/project ...
    """
    from colorama import init as colorama_init
    colorama_init()

    # --workspace-root can come from argv or be derived from this script's location
    workspace_root = None

    # Check for --workspace-root in sys.argv before click parses.
    # Take the last occurrence so user overrides beat the shim's hardcoded value.
    for i, arg in enumerate(sys.argv[1:], 1):
        if arg == "--workspace-root" and i < len(sys.argv) - 1:
            workspace_root = sys.argv[i + 1]
        elif arg.startswith("--workspace-root="):
            workspace_root = arg.split("=", 1)[1]

    # Fallback: cwd (workspace_root stays None → defaults to cwd in _build_cli)

    # Discover project tool dirs
    project_tool_dirs: list[str] = []
    if workspace_root:
 
_validate_steps function · python · L17-L63 (47 LOC)
repo_tools/command_runner.py
def _validate_steps(section_name: str, raw: Any) -> list[dict]:
    """Validate and normalize a ``steps`` value into a list of step dicts.

    Each item is either a string (shorthand for ``{"command": str}``) or a dict
    with ``command`` (required), plus optional ``cwd``, ``env_script``, ``env``.

    Raises ``SystemExit(1)`` on validation failure.
    """
    if not isinstance(raw, list):
        print(f"Error: '{section_name}' steps must be a list, got {type(raw).__name__}", file=sys.stderr)
        raise SystemExit(1)

    result: list[dict] = []
    for i, item in enumerate(raw):
        if isinstance(item, str):
            result.append({"command": item})
        elif isinstance(item, dict):
            if "command" not in item:
                print(
                    f"Error: '{section_name}' step [{i}] missing required 'command' key",
                    file=sys.stderr,
                )
                raise SystemExit(1)
            unknown = set(item) - _STEP_KEYS
   
_parse_env_list function · python · L66-L78 (13 LOC)
repo_tools/command_runner.py
def _parse_env_list(entries: list[str]) -> dict[str, str]:
    """Convert ``["KEY=VALUE", ...]`` to a dict.

    Raises ``SystemExit(1)`` if any entry is missing ``=``.
    """
    result: dict[str, str] = {}
    for entry in entries:
        if "=" not in entry:
            print(f"Error: env entry {entry!r} missing '='", file=sys.stderr)
            raise SystemExit(1)
        key, value = entry.split("=", 1)
        result[key] = value
    return result
CommandRunnerTool.execute method · python · L93-L151 (59 LOC)
repo_tools/command_runner.py
    def execute(self, ctx: ToolContext, args: dict[str, Any]) -> None:
        raw_steps = args.get("steps")
        if not raw_steps:
            logger.error(
                f"No {self.name!r} steps configured. Add to config.yaml:\n"
                f"  {self.config_hint}"
            )
            raise SystemExit(1)

        tokens = dict(ctx.tokens)

        # Merge remaining args as extra tokens (skip step-framework keys).
        _skip = {"steps", "dry_run"}
        for k, v in args.items():
            if k not in _skip and v is not None:
                tokens[k] = str(v)

        formatter = TokenFormatter(tokens)
        steps = _validate_steps(self.name, raw_steps)

        # Resolve each step through token expansion.
        resolved: list[dict] = []
        for step in steps:
            r: dict[str, Any] = {}
            r["command"] = formatter.resolve(step["command"])
            if "cwd" in step:
                r["cwd"] = Path(formatter.resolve(str(step["cwd"])))
  
ContextTool.execute method · python · L24-L31 (8 LOC)
repo_tools/context.py
    def execute(self, ctx: ToolContext, args: dict[str, Any]) -> None:
        tokens = dict(sorted(ctx.tokens.items()))

        if args.get("as_json"):
            print(json.dumps(tokens, indent=2))
        else:
            for key, value in tokens.items():
                logger.info(f"{key}: {value}")
_level_color function · python · L32-L37 (6 LOC)
repo_tools/core.py
def _level_color(levelno: int) -> str:
    if levelno >= logging.ERROR:
        return Fore.RED
    if levelno >= logging.WARNING:
        return Fore.YELLOW
    return Fore.CYAN
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
TokenFormatter.resolve method · python · L72-L87 (16 LOC)
repo_tools/core.py
    def resolve(self, template: str) -> str:
        result = template
        for _ in range(self.MAX_DEPTH):
            try:
                expanded = result.format_map(self._tokens)
            except KeyError as exc:
                missing = exc.args[0] if exc.args else "unknown"
                raise KeyError(f"Missing token: {missing}") from exc
            if expanded == result:
                return expanded
            result = expanded
        remaining = _extract_references(result)
        raise ValueError(
            f"Token expansion exceeded {self.MAX_DEPTH} iterations"
            f" (unresolved: {', '.join(sorted(remaining))})"
        )
_builtin_tokens function · python · L96-L109 (14 LOC)
repo_tools/core.py
def _builtin_tokens() -> dict[str, str]:
    system = platform.system()
    is_win = system == "Windows"
    is_mac = system == "Darwin"
    # Framework root: parent of the repo_tools package (the submodule dir).
    framework_root = posix_path(str(Path(__file__).resolve().parent.parent))
    return {
        "exe_ext": ".exe" if is_win else "",
        "shell_ext": ".cmd" if is_win else ".sh",
        "lib_ext": ".dll" if is_win else (".dylib" if is_mac else ".so"),
        "path_sep": ";" if is_win else ":",
        "repo": f'"{posix_path(sys.executable)}" -m repo_tools.cli --workspace-root "{{workspace_root}}"',
        "framework_root": framework_root,
    }
_extract_references function · python · L116-L126 (11 LOC)
repo_tools/core.py
def _extract_references(template: str) -> set[str]:
    """Return the set of token names referenced by ``{name}`` placeholders.

    Uses ``string.Formatter().parse()`` which correctly ignores escaped
    braces (``{{``/``}}``), returning ``field_name=None`` for those.
    """
    refs: set[str] = set()
    for _, field_name, _, _ in string.Formatter().parse(template):
        if field_name is not None:
            refs.add(field_name)
    return refs
_validate_token_graph function · python · L129-L164 (36 LOC)
repo_tools/core.py
def _validate_token_graph(tokens: dict[str, str]) -> None:
    """Validate the token dependency graph before expansion.

    Raises:
        ValueError: on self-references or cycles (with the cycle path).
        KeyError: when a token references an undefined token.
    """
    # Build dependency graph: token -> set of tokens it depends on
    graph: dict[str, set[str]] = {}
    for name, value in tokens.items():
        refs = _extract_references(str(value))
        graph[name] = refs

        # Self-reference check (clear message before TopologicalSorter)
        if name in refs:
            raise ValueError(f"Token '{name}' references itself")

    # Missing reference check
    all_names = set(tokens)
    for name, refs in graph.items():
        missing = refs - all_names
        if missing:
            raise KeyError(
                f"Token '{name}' references undefined token(s): "
                + ", ".join(sorted(missing))
            )

    # Cycle detection via topological so
resolve_tokens function · python · L167-L222 (56 LOC)
repo_tools/core.py
def resolve_tokens(
    workspace_root: str,
    config: dict[str, Any],
    dimension_values: dict[str, str],
) -> dict[str, str]:
    """Build the full token dictionary.

    Merge order (later wins):
      1. Built-in tokens (exe_ext, shell_ext, repo, etc.)
      2. Variable tokens from config
      3. workspace_root path token
      4. Dimension values (platform, build_type, etc.)
    """
    tokens: dict[str, str] = _builtin_tokens()

    # Variable tokens from config (repo.tokens section)
    repo_section = config.get("repo", {})
    if not isinstance(repo_section, dict):
        repo_section = {}
    for key, value in repo_section.get("tokens", {}).items():
        if isinstance(value, list):
            continue  # dimension tokens handled elsewhere
        if key in _RESERVED_TOKENS:
            logger.warning(f"'{key}' is a reserved token and cannot be overridden in config.")
            continue
        if isinstance(value, dict):
            raw = str(value.get("value", "")
load_config function · python · L228-L238 (11 LOC)
repo_tools/core.py
def load_config(workspace_root: str) -> dict[str, Any]:
    """Load config.yaml from workspace root."""
    config_path = Path(workspace_root) / "config.yaml"
    if not config_path.exists():
        return {}
    data = yaml.safe_load(config_path.read_text(encoding="utf-8"))
    if data is None:
        return {}
    if not isinstance(data, dict):
        raise TypeError("config.yaml must contain a top-level mapping.")
    return data
resolve_filters function · python · L242-L258 (17 LOC)
repo_tools/core.py
def resolve_filters(config: dict[str, Any], dimension_values: dict[str, str]) -> dict[str, Any]:
    """Walk config dict, resolve ``key@filter`` entries.

    Filter syntax:
    - ``@value`` — matches any dimension whose current value equals *value*
    - ``@val1,val2`` — AND across different dimensions
    - ``@!value`` — negation
    - ``@val1,!val2`` — compound

    More-specific filters (more conditions) win over less-specific ones.
    """
    # Build reverse lookup: value -> dimension name
    dim_lookup: dict[str, str] = {}
    for dim_name, dim_val in dimension_values.items():
        dim_lookup[dim_val] = dim_name

    return _walk_filters(config, dimension_values, dim_lookup)
_walk_filters function · python · L261-L303 (43 LOC)
repo_tools/core.py
def _walk_filters(
    obj: Any,
    dim_values: dict[str, str],
    dim_lookup: dict[str, str],
) -> Any:
    if isinstance(obj, dict):
        # Collect base keys and filtered keys
        base: dict[str, Any] = {}
        filtered: dict[str, list[tuple[str, int, Any]]] = {}  # base_key -> [(filter, specificity, value)]

        for key, value in obj.items():
            if "@" in str(key):
                parts = str(key).split("@", 1)
                base_key = parts[0]
                filter_str = parts[1]
                match, specificity = _match_filter(filter_str, dim_values, dim_lookup)
                if match:
                    filtered.setdefault(base_key, []).append((filter_str, specificity, value))
            else:
                base[key] = value

        # Resolve: most-specific filter wins over base
        result: dict[str, Any] = {}
        for key, value in base.items():
            if key in filtered:
                # Pick most specific
                candid
Repobility · MCP-ready · https://repobility.com
_match_filter function · python · L306-L346 (41 LOC)
repo_tools/core.py
def _match_filter(
    filter_str: str,
    dim_values: dict[str, str],
    dim_lookup: dict[str, str],
) -> tuple[bool, int]:
    """Check if a filter matches the current dimension values.

    Returns ``(matches, specificity)`` where specificity = number of conditions.
    """
    conditions = [c.strip() for c in filter_str.split(",") if c.strip()]
    if not conditions:
        return True, 0

    for cond in conditions:
        negate = cond.startswith("!")
        value = cond.lstrip("!")

        # Find which dimension this value belongs to
        matched_any = False
        for dim_name, dim_val in dim_values.items():
            if value == dim_val or value == dim_name:
                matched_any = True
                if negate:
                    if value == dim_val:
                        return False, 0  # Negation failed
                break

        # Also check if value is a known dimension value (not current)
        if not matched_any:
            if value in dim_
registered_tool_deps function · python · L411-L416 (6 LOC)
repo_tools/core.py
def registered_tool_deps() -> list[str]:
    """Collect, deduplicate, and sort deps from all registered tools."""
    seen: set[str] = set()
    for tool in _TOOL_REGISTRY.values():
        seen.update(tool.deps)
    return sorted(seen)
invoke_tool function · python · L419-L449 (31 LOC)
repo_tools/core.py
def invoke_tool(
    name: str,
    tokens: dict[str, str],
    config: dict[str, Any],
    dimensions: dict[str, str] | None = None,
    extra_args: dict[str, Any] | None = None,
) -> None:
    """Invoke a registered tool programmatically (e.g. prebuild/postbuild steps)."""
    tool = get_tool(name)
    if tool is None:
        raise KeyError(f"Tool '{name}' is not registered.")

    tool_config = config.get(name, {})
    if not isinstance(tool_config, dict):
        tool_config = {}

    ctx = ToolContext(
        workspace_root=Path(tokens.get("workspace_root", ".")),
        tokens=tokens,
        config=config,
        tool_config=tool_config,
        dimensions=dimensions or {},
        passthrough_args=[],
    )

    args: dict[str, Any] = {**tool.default_args(tokens)}
    args.update(tool_config)
    if extra_args:
        args.update(extra_args)

    tool.execute(ctx, args)
detect_platform_identifier function · python · L455-L493 (39 LOC)
repo_tools/core.py
def detect_platform_identifier(
    platform_override: str | None = None,
    conan_profile_path: Path | None = None,
) -> str:
    """Detect platform identifier for build directory structure.

    Priority: 1. Explicit override  2. Conan profile  3. Host auto-detect
    """
    if platform_override:
        return platform_override

    if conan_profile_path and conan_profile_path.exists():
        try:
            profile_content = conan_profile_path.read_text()
            os_match = re.search(r"^os=(\w+)", profile_content, re.MULTILINE)
            arch_match = re.search(r"^arch=(\w+)", profile_content, re.MULTILINE)
            if os_match and arch_match:
                return _map_platform_identifier(os_match.group(1), arch_match.group(1))
        except (OSError, UnicodeDecodeError):
            pass

    system = platform.system()
    machine = platform.machine().lower()

    if machine in ("x86_64", "amd64"):
        arch = "x64"
    elif machine in ("arm64", "aarch64", "armv
_map_platform_identifier function · python · L496-L518 (23 LOC)
repo_tools/core.py
def _map_platform_identifier(os_val: str, arch_val: str) -> str:
    """Map Conan os/arch settings to platform identifier."""
    if os_val == "Emscripten" and arch_val == "wasm":
        return "emscripten"

    os_map = {
        "Windows": "windows",
        "Linux": "linux",
        "Macos": "macos",
        "Darwin": "macos",
    }
    os_normalized = os_map.get(os_val, os_val.lower())

    arch_map = {
        "x86_64": "x64",
        "x86": "x86",
        "armv8": "arm64",
        "armv8_32": "arm",
        "wasm": "wasm",
    }
    arch_normalized = arch_map.get(arch_val, arch_val.lower())

    return f"{os_normalized}-{arch_normalized}"
log_section function · python · L534-L544 (11 LOC)
repo_tools/core.py
def log_section(title: str) -> Generator[None, None, None]:
    """Foldable CI section or styled terminal header."""
    if _is_ci():
        print(f"::group::{title}", flush=True)
    else:
        logger.info(f"── {title} ──")
    try:
        yield
    finally:
        if _is_ci():
            print("::endgroup::", flush=True)
find_venv_executable function · python · L553-L567 (15 LOC)
repo_tools/core.py
def find_venv_executable(name: str) -> str:
    """Find an executable in the virtual environment, fallback to system PATH."""
    python_exe = Path(sys.executable)
    scripts_dir = python_exe.parent
    exe_path = scripts_dir / (name + (".exe" if sys.platform == "win32" else ""))

    if exe_path.exists():
        return str(exe_path)

    exe_path_str = shutil.which(name)
    if exe_path_str:
        return exe_path_str

    logger.warning(f"Executable {name} not found in virtual environment or system PATH")
    return name
sanitized_subprocess_env function · python · L570-L603 (34 LOC)
repo_tools/core.py
def sanitized_subprocess_env() -> dict[str, str]:
    """Return env overrides that strip repo-tool Python contamination.

    The generated shim (``repo`` / ``repo.cmd``) prepends the venv's Scripts
    directory to ``PATH`` and sets ``PYTHONPATH`` so the CLI can import
    ``repo_tools``.  These variables must **not** leak into build-tool
    subprocesses (Conan, CMake, …) because they can cause the wrong Python
    stdlib to be loaded — for example, a system Python 3.12 picking up the
    venv's Python 3.14 stdlib, resulting in ``SRE module mismatch`` or
    ``_thread`` attribute errors.

    Returns a dict suitable for the *env* parameter of :func:`run_command`
    or :class:`CommandGroup`.  The dict is merged **on top of**
    ``os.environ``, so only the keys that need overriding are present.
    """
    env: dict[str, str] = {}

    # Strip PYTHONPATH — only needed for repo_tools imports
    env["PYTHONPATH"] = ""

    # Strip PYTHONHOME if present
    if "PYTHONHOME" in os.enviro
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
run_command function · python · L606-L662 (57 LOC)
repo_tools/core.py
def run_command(
    cmd: list[str],
    log_file: Path | None = None,
    env_script: Path | None = None,
    cwd: Path | None = None,
    env: dict[str, str] | None = None,
) -> None:
    """Run a command and optionally tee output to a log file.

    If *env_script* is provided, the command is executed inside a shell
    that sources the script first.  Errors out when the script doesn't exist.
    """
    use_shell = False
    run_cmd: list[str] | str = cmd
    if env_script is not None:
        script = env_script
        if not script.suffix:
            script = script.with_suffix(".bat" if is_windows() else ".sh")
        if not script.exists():
            logger.error(f"env_script not found: {script}")
            sys.exit(1)
        if is_windows():
            cmd_str = subprocess.list2cmdline(cmd)
            run_cmd = f'call "{script}" >nul 2>&1 && {cmd_str}'
        else:
            cmd_str = shlex.join(cmd)
            run_cmd = f'. "{script}" >/dev/null 2>&1 && {cmd_str
remove_tree_with_retries function · python · L665-L681 (17 LOC)
repo_tools/core.py
def remove_tree_with_retries(
    path: Path, attempts: int = 5, delay: float = 1.0,
) -> None:
    """Remove a directory tree with retry logic for locked files (Windows)."""
    for attempt in range(attempts):
        try:
            shutil.rmtree(path)
            return
        except PermissionError:
            if attempt < attempts - 1:
                logger.warning(
                    f"Permission denied removing {path}, "
                    f"retrying in {delay}s ({attempt + 1}/{attempts})"
                )
                time.sleep(delay)
            else:
                raise
resolve_path function · python · L684-L691 (8 LOC)
repo_tools/core.py
def resolve_path(root: Path, template: str, tokens: dict[str, str]) -> Path:
    """Resolve a path template using tokens."""
    formatter = TokenFormatter(tokens)
    resolved = formatter.resolve(template)
    path = Path(resolved)
    if not path.is_absolute():
        path = root / path
    return path
glob_paths function · python · L697-L705 (9 LOC)
repo_tools/core.py
def glob_paths(pattern: Path | str) -> list[Path]:
    """Expand a glob pattern to a sorted list of matching file paths.

    Returns a single-element list for non-glob paths.
    """
    pattern_text = str(pattern)
    if any(char in pattern_text for char in ("*", "?", "[")):
        return sorted(Path(match) for match in glob.glob(pattern_text, recursive=True))
    return [Path(pattern_text)]
CommandGroup.__init__ method · python · L728-L742 (15 LOC)
repo_tools/core.py
    def __init__(
        self,
        label: str,
        log_file: Path | None = None,
        env_script: Path | None = None,
        cwd: Path | None = None,
        env: dict[str, str] | None = None,
    ) -> None:
        self.label = label
        self.log_file = log_file
        self.env_script = env_script
        self.cwd = cwd
        self.env = env
        self._commands_run = 0
        self._failed = False
CommandGroup.__enter__ method · python · L744-L749 (6 LOC)
repo_tools/core.py
    def __enter__(self) -> CommandGroup:
        if _is_ci():
            print(f"::group::{self.label}", flush=True)
        else:
            logger.info(f"── {self.label} ──")
        return self
CommandGroup.__exit__ method · python · L751-L759 (9 LOC)
repo_tools/core.py
    def __exit__(self, exc_type: type | None, exc_val: BaseException | None, exc_tb: Any) -> None:
        if _is_ci():
            print("::endgroup::", flush=True)
        if exc_type is not None:
            return  # let the exception propagate
        if self._failed:
            logger.error(f"  ✗ {self.label} failed")
        else:
            logger.info(f"  ✓ {self.label} ({self._commands_run} command(s))")
CommandGroup.run method · python · L761-L783 (23 LOC)
repo_tools/core.py
    def run(
        self,
        cmd: list[str],
        log_file: Path | None = None,
        env_script: Path | None = None,
        cwd: Path | None = None,
        env: dict[str, str] | None = None,
    ) -> None:
        """Run a command within this group.

        Per-call *log_file*, *env_script*, *cwd*, and *env* override the
        group defaults.  Per-call *env* is merged on top of group-level env.
        """
        lf = log_file or self.log_file
        es = env_script or self.env_script
        cw = cwd or self.cwd
        merged_env = {**(self.env or {}), **(env or {})} or None
        try:
            run_command(cmd, log_file=lf, env_script=es, cwd=cw, env=merged_env)
            self._commands_run += 1
        except SystemExit:
            self._failed = True
            raise
If a scraper extracted this row, it came from Repobility (https://repobility.com)
to_cmake_build_type function · python · L789-L798 (10 LOC)
repo_tools/core.py
def to_cmake_build_type(value: str | None) -> str:
    if not value:
        return "Debug"
    mapping = {
        "debug": "Debug",
        "release": "Release",
        "relwithdebinfo": "RelWithDebInfo",
        "minsizerel": "MinSizeRel",
    }
    return mapping.get(str(value).casefold(), str(value))
find_executable function · python · L17-L27 (11 LOC)
repo_tools/features.py
def find_executable(name: str) -> str | None:
    """Find *name* in the venv Scripts dir or system PATH.

    Returns the path string, or ``None`` if not found.
    """
    scripts_dir = Path(sys.executable).parent
    suffix = ".exe" if sys.platform == "win32" else ""
    exe_path = scripts_dir / (name + suffix)
    if exe_path.exists():
        return str(exe_path)
    return shutil.which(name)
require_executable function · python · L30-L49 (20 LOC)
repo_tools/features.py
def require_executable(name: str, *, feature: str) -> str:
    """Find *name* or exit with a helpful error about which feature to enable.

    Returns the executable path on success.  On failure calls
    ``sys.exit(1)`` after logging a message that tells the user
    which *feature* group to add to ``repo.features`` in config.yaml.
    """
    exe = find_executable(name)
    if exe is not None:
        return exe

    from .core import logger  # lazy — keep module-level stdlib-only

    logger.error(
        "%s not found. Enable the '%s' feature in config.yaml "
        "(repo.features) and run 'repo init' to install it.",
        name,
        feature,
    )
    sys.exit(1)
‹ prevpage 2 / 3next ›