← back to dr-robert-li__dgx-toolbox

Function bodies 178 total

All specs Real LLM only Function bodies
CheckResult class · python · L50-L55 (6 LOC)
examples/dgx_toolbox.py
class CheckResult:
    """Result of a single validation check."""
    name: str
    passed: bool
    message: str
    details: dict[str, Any] = field(default_factory=dict)
ValidationResult class · python · L59-L76 (18 LOC)
examples/dgx_toolbox.py
class ValidationResult:
    """Result of all validation checks."""
    checks: list[CheckResult] = field(default_factory=list)

    @property
    def ok(self) -> bool:
        return all(c.passed for c in self.checks)

    def report(self) -> str:
        lines = []
        for c in self.checks:
            icon = "✓" if c.passed else "✗"
            lines.append(f"  {icon} {c.name}: {c.message}")
        return "\n".join(lines)

    @property
    def failures(self) -> list[CheckResult]:
        return [c for c in self.checks if not c.passed]
report method · python · L67-L72 (6 LOC)
examples/dgx_toolbox.py
    def report(self) -> str:
        lines = []
        for c in self.checks:
            icon = "✓" if c.passed else "✗"
            lines.append(f"  {icon} {c.name}: {c.message}")
        return "\n".join(lines)
ExecResult class · python · L80-L99 (20 LOC)
examples/dgx_toolbox.py
class ExecResult:
    """Result of a container execution."""
    command: list[str]
    returncode: int
    stdout: str
    stderr: str
    duration_s: float
    container: str
    skipped: bool = False
    skip_reason: str = ""

    @property
    def ok(self) -> bool:
        return self.returncode == 0 or self.skipped

    def summary(self) -> str:
        if self.skipped:
            return f"SKIPPED: {self.skip_reason}"
        status = "OK" if self.ok else f"FAILED (exit {self.returncode})"
        return f"{status} [{self.duration_s:.1f}s] {' '.join(self.command[-3:])}"
summary method · python · L95-L99 (5 LOC)
examples/dgx_toolbox.py
    def summary(self) -> str:
        if self.skipped:
            return f"SKIPPED: {self.skip_reason}"
        status = "OK" if self.ok else f"FAILED (exit {self.returncode})"
        return f"{status} [{self.duration_s:.1f}s] {' '.join(self.command[-3:])}"
__init__ method · python · L112-L117 (6 LOC)
examples/dgx_toolbox.py
    def __init__(self, config_path: Path | None = None):
        self._config = self._load_config(config_path or CONFIG_PATH)
        self._base = self._resolve_base()
        self._project_root = Path(self._config.get("project_root", ".")).resolve()
        self._containers: dict[str, dict] = self._config.get("containers", {})
        self._exec_log: list[ExecResult] = []
_load_config method · python · L121-L124 (4 LOC)
examples/dgx_toolbox.py
    def _load_config(self, path: Path) -> dict:
        if path.exists():
            return yaml.safe_load(path.read_text()) or {}
        return {}
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_resolve_base method · python · L126-L133 (8 LOC)
examples/dgx_toolbox.py
    def _resolve_base(self) -> Path:
        env_path = os.environ.get("DGX_TOOLBOX_PATH")
        if env_path:
            return Path(env_path).expanduser().resolve()
        config_path = self._config.get("dgx_toolbox_path")
        if config_path:
            return Path(config_path).expanduser().resolve()
        return Path("~/dgx-toolbox").expanduser().resolve()
resolve method · python · L152-L169 (18 LOC)
examples/dgx_toolbox.py
    def resolve(self, component: str) -> Path:
        """Resolve a component name to its full script path."""
        if not self.available:
            raise FileNotFoundError(
                f"DGX Toolbox not found at {self._base}. "
                f"Set DGX_TOOLBOX_PATH or update config/dgx_toolbox.yaml"
            )
        components = self._config.get("components", {})
        rel_path = components.get(component)
        if not rel_path:
            raise KeyError(
                f"Unknown component '{component}'. "
                f"Available: {', '.join(components.keys())}"
            )
        full_path = self._base / rel_path
        if not full_path.exists():
            raise FileNotFoundError(f"Component not found: {full_path}")
        return full_path
port method · python · L171-L173 (3 LOC)
examples/dgx_toolbox.py
    def port(self, component: str) -> int | None:
        """Get the configured port for a component."""
        return self._config.get("ports", {}).get(component)
vllm_endpoint method · python · L180-L182 (3 LOC)
examples/dgx_toolbox.py
    def vllm_endpoint(self) -> str:
        port = self.port("vllm") or 8020
        return f"http://localhost:{port}/v1"
litellm_endpoint method · python · L184-L186 (3 LOC)
examples/dgx_toolbox.py
    def litellm_endpoint(self) -> str:
        port = self.port("litellm") or 4000
        return f"http://localhost:{port}/v1"
validate method · python · L190-L210 (21 LOC)
examples/dgx_toolbox.py
    def validate(self, checks: list[str]) -> ValidationResult:
        """Run named validation checks. Returns structured result.

        Available checks:
            "toolbox"        — dgx-toolbox directory exists
            "training_data"  — training data file exists (path from config)
            "config"         — config file exists (path from config)
            "memory:N"       — at least N GB available memory
            "container:name" — named container is running
            "mounted:name"   — project is mounted in named container
            "gpu"            — GPU is accessible (via container)
            "deps:name"      — pinned deps installed in container
        """
        result = ValidationResult()
        for check in checks:
            if ":" in check:
                name, arg = check.split(":", 1)
            else:
                name, arg = check, ""
            result.checks.append(self._run_check(name, arg))
        return result
_run_check method · python · L212-L229 (18 LOC)
examples/dgx_toolbox.py
    def _run_check(self, name: str, arg: str) -> CheckResult:
        dispatch = {
            "toolbox": self._check_toolbox,
            "training_data": self._check_training_data,
            "config": self._check_config,
            "memory": self._check_memory,
            "container": self._check_container,
            "mounted": self._check_mounted,
            "gpu": self._check_gpu,
            "deps": self._check_deps,
        }
        fn = dispatch.get(name)
        if not fn:
            return CheckResult(name, False, f"Unknown check: {name}")
        try:
            return fn(arg)
        except Exception as e:
            return CheckResult(name, False, f"Check error: {e}")
_check_toolbox method · python · L231-L234 (4 LOC)
examples/dgx_toolbox.py
    def _check_toolbox(self, _: str) -> CheckResult:
        if self.available:
            return CheckResult("toolbox", True, f"Found at {self._base}")
        return CheckResult("toolbox", False, f"Not found at {self._base}")
Same scanner, your repo: https://repobility.com — Repobility
_check_training_data method · python · L236-L242 (7 LOC)
examples/dgx_toolbox.py
    def _check_training_data(self, _: str) -> CheckResult:
        rel = self._config.get("validation_paths", {}).get("training_data", "data/train.jsonl")
        path = self._project_root / rel
        if path.exists():
            lines = sum(1 for _ in open(path))
            return CheckResult("training_data", True, f"{lines} examples", {"path": str(path), "lines": lines})
        return CheckResult("training_data", False, f"Not found: {path}")
_check_config method · python · L244-L249 (6 LOC)
examples/dgx_toolbox.py
    def _check_config(self, _: str) -> CheckResult:
        rel = self._config.get("validation_paths", {}).get("config", "config/train_config.yaml")
        path = self._project_root / rel
        if path.exists():
            return CheckResult("config", True, f"Found: {path}")
        return CheckResult("config", False, f"Not found: {path}")
_check_memory method · python · L251-L277 (27 LOC)
examples/dgx_toolbox.py
    def _check_memory(self, arg: str) -> CheckResult:
        min_gb = float(arg) if arg else 70.0
        try:
            meminfo = Path("/proc/meminfo").read_text()
            mem = {l.split(":")[0].strip(): int(l.split(":")[1].strip().split()[0])
                   for l in meminfo.splitlines() if ":" in l}
            avail_gb = mem.get("MemAvailable", 0) / (1024 * 1024)
            if avail_gb >= min_gb:
                return CheckResult("memory", True, f"{avail_gb:.1f} GB available (need {min_gb})",
                                   {"available_gb": avail_gb, "required_gb": min_gb})
            # Get top memory consumers for diagnostics
            ps = subprocess.run(["ps", "aux", "--sort=-rss"], capture_output=True, text=True, timeout=5)
            top_procs = []
            for line in ps.stdout.strip().split("\n")[1:6]:
                parts = line.split(None, 10)
                if len(parts) >= 11:
                    rss_mb = int(parts[5]) // 1024 if parts[5].isdigit(
_check_container method · python · L279-L286 (8 LOC)
examples/dgx_toolbox.py
    def _check_container(self, name: str) -> CheckResult:
        cname = self._containers.get(name, {}).get("container_name", name)
        result = subprocess.run(["docker", "ps", "--format", "{{.Names}}"],
                                capture_output=True, text=True, timeout=5)
        running = result.stdout.strip().split("\n")
        if cname in running:
            return CheckResult(f"container:{name}", True, f"{cname} is running")
        return CheckResult(f"container:{name}", False, f"{cname} is not running")
_check_mounted method · python · L288-L299 (12 LOC)
examples/dgx_toolbox.py
    def _check_mounted(self, name: str) -> CheckResult:
        mapping = self._containers.get(name, {})
        cname = mapping.get("container_name", name)
        workdir = mapping.get("workdir", "/workspace/project")
        check_file = self._config.get("mount_check_file", "pyproject.toml")
        check = subprocess.run(
            ["docker", "exec", cname, "test", "-f", f"{workdir}/{check_file}"],
            capture_output=True, timeout=5)
        if check.returncode == 0:
            return CheckResult(f"mounted:{name}", True, f"Project visible at {workdir}")
        return CheckResult(f"mounted:{name}", False,
                           f"Project not mounted in {cname}. Restart with EXTRA_MOUNTS.")
_check_gpu method · python · L301-L313 (13 LOC)
examples/dgx_toolbox.py
    def _check_gpu(self, name: str) -> CheckResult:
        cname = self._containers.get(name, {}).get("container_name", name) if name else None
        if not cname:
            # Fall back to first container
            first = next(iter(self._containers.values()), {})
            cname = first.get("container_name", "")
        result = subprocess.run(
            ["docker", "exec", cname, "nvidia-smi", "--query-gpu=name,memory.total",
             "--format=csv,noheader"],
            capture_output=True, text=True, timeout=10)
        if result.returncode == 0 and result.stdout.strip():
            return CheckResult("gpu", True, result.stdout.strip())
        return CheckResult("gpu", False, "No GPU access in container")
_check_deps method · python · L315-L327 (13 LOC)
examples/dgx_toolbox.py
    def _check_deps(self, name: str) -> CheckResult:
        cname = self._containers.get(name, {}).get("container_name", name)
        workdir = self._containers.get(name, {}).get("workdir", "/workspace")
        # Check critical imports
        imports = self._config.get("required_imports", ["torch", "numpy"])
        check_script = "import " + ",".join(imports) + ";print('OK')"
        result = subprocess.run(
            ["docker", "exec", "-w", workdir, cname, "python", "-c", check_script],
            capture_output=True, text=True, timeout=30)
        if result.returncode == 0 and "OK" in result.stdout:
            return CheckResult(f"deps:{name}", True, "All pinned deps available")
        missing = result.stderr.strip().split("\n")[-1] if result.stderr else "unknown"
        return CheckResult(f"deps:{name}", False, f"Missing deps: {missing}")
ensure_ready method · python · L331-L383 (53 LOC)
examples/dgx_toolbox.py
    def ensure_ready(self, component: str, wait: int = 45) -> ValidationResult:
        """Ensure a container is running, project mounted, and deps installed.

        This is the main entry point for preparing a container for work.
        It handles: start → wait → mount check → dep install → validate.

        Args:
            component: Key from containers config (e.g., "unsloth_studio")
            wait: Seconds to wait after starting container

        Returns:
            ValidationResult with all checks.
        """
        mapping = self._containers.get(component)
        if not mapping:
            r = ValidationResult()
            r.checks.append(CheckResult(component, False, f"Unknown component: {component}"))
            return r

        cname = mapping["container_name"]
        workdir = mapping.get("workdir")

        # Step 1: Is container running?
        container_check = self._check_container(component)
        if not container_check.passed:
            print(f"  
Want this analysis on your repo? https://repobility.com/scan/
_start_container method · python · L385-L391 (7 LOC)
examples/dgx_toolbox.py
    def _start_container(self, component: str) -> None:
        """Start a container via dgx-toolbox with EXTRA_MOUNTS."""
        mapping = self._containers[component]
        script = self.resolve(mapping["component"])
        env = os.environ.copy()
        env["EXTRA_MOUNTS"] = f"{self._project_root}:{mapping['workdir']}"
        subprocess.run(["bash", str(script)], env=env, capture_output=True, text=True)
_install_deps method · python · L393-L407 (15 LOC)
examples/dgx_toolbox.py
    def _install_deps(self, component: str) -> None:
        """Install pinned deps inside a container."""
        cname = self._containers[component]["container_name"]
        versions = self.pinned_versions
        if not versions:
            return
        # Build pip install command from pinned versions
        pkgs = [f"{pkg}=={ver}" for pkg, ver in versions.items()]
        extras = self._config.get("extra_deps", [])
        cmd = ["docker", "exec", cname, "pip", "install", "--no-deps"] + pkgs
        subprocess.run(cmd, capture_output=True, text=True, timeout=300)
        # Install extras (with deps)
        if extras:
            cmd2 = ["docker", "exec", cname, "pip", "install"] + extras
            subprocess.run(cmd2, capture_output=True, text=True, timeout=300)
execute method · python · L411-L473 (63 LOC)
examples/dgx_toolbox.py
    def execute(
        self,
        component: str,
        *cmd: str,
        capture: bool = False,
        timeout: int | None = None,
        idempotency_check: str | None = None,
    ) -> ExecResult:
        """Execute a command inside a container. The core execution method.

        Args:
            component: Key from containers config (e.g., "unsloth_studio")
            *cmd: Command and args (e.g., "python", "-m", "scripts.train_model")
            capture: Capture stdout/stderr (default: stream to terminal)
            timeout: Timeout in seconds
            idempotency_check: Path to check inside container — if exists, skip execution

        Returns:
            ExecResult with structured output.
        """
        mapping = self._containers.get(component)
        if not mapping:
            return ExecResult(list(cmd), 1, "", f"Unknown component: {component}", 0, "")

        cname = mapping["container_name"]
        workdir = mapping.get("workdir")

        # Idempo
run_service method · python · L475-L502 (28 LOC)
examples/dgx_toolbox.py
    def run_service(self, component: str, *args: str) -> ExecResult:
        """Start a dgx-toolbox service (vLLM, LiteLLM, etc.) — not exec'd into.

        Args:
            component: Key from components config (e.g., "vllm")
            *args: Additional arguments (e.g., model name)
        """
        script = self.resolve(component)
        env = os.environ.copy()
        mount_target = self._containers.get(component, {}).get("workdir", "/workspace")
        env["EXTRA_MOUNTS"] = f"{self._project_root}:{mount_target}"
        start = time.time()
        proc = subprocess.run(
            ["bash", str(script)] + list(args),
            env=env, capture_output=True, text=True, timeout=60,
        )
        duration = time.time() - start
        cname = self._containers.get(component, {}).get("container_name", component)
        result = ExecResult(
            command=["bash", str(script)] + list(args),
            returncode=proc.returncode,
            stdout=proc.stdout,
       
status_report method · python · L506-L577 (72 LOC)
examples/dgx_toolbox.py
    def status_report(self) -> dict:
        """Structured status for telemetry agents to consume.

        Returns a dict with container states, execution log, resource usage,
        and pipeline progress — everything a background observer needs.
        """
        # Container states
        ps = subprocess.run(["docker", "ps", "--format", "json"],
                            capture_output=True, text=True, timeout=5)
        containers = []
        if ps.stdout.strip():
            for line in ps.stdout.strip().split("\n"):
                try:
                    containers.append(json.loads(line))
                except json.JSONDecodeError:
                    pass

        # Memory
        try:
            meminfo = Path("/proc/meminfo").read_text()
            mem = {l.split(":")[0].strip(): int(l.split(":")[1].strip().split()[0])
                   for l in meminfo.splitlines() if ":" in l}
            memory = {
                "total_gb": round(mem.get("MemTotal", 0) / (102
info method · python · L581-L591 (11 LOC)
examples/dgx_toolbox.py
    def info(self) -> dict:
        """Basic toolbox info for diagnostics."""
        return {
            "path": str(self._base),
            "available": self.available,
            "config_file": str(CONFIG_PATH),
            "vllm_endpoint": self.vllm_endpoint(),
            "litellm_endpoint": self.litellm_endpoint(),
            "components": list(self._config.get("components", {}).keys()),
            "containers": list(self._containers.keys()),
        }
get_toolbox function · python · L599-L604 (6 LOC)
examples/dgx_toolbox.py
def get_toolbox() -> DGXToolbox:
    """Get the singleton DGXToolbox instance."""
    global _instance
    if _instance is None:
        _instance = DGXToolbox()
    return _instance
verify_api_key function · python · L13-L29 (17 LOC)
harness/auth/bearer.py
async def verify_api_key(
    credentials: HTTPAuthorizationCredentials = Depends(_bearer),
    request: Request = None,
) -> TenantConfig:
    """FastAPI dependency that verifies a Bearer token against tenant hashes.

    Returns the matching TenantConfig on success.
    Raises HTTP 401 if the key does not match any tenant.
    """
    token = credentials.credentials
    for tenant in request.app.state.tenants:
        try:
            _ph.verify(tenant.api_key_hash, token)
            return tenant
        except (VerifyMismatchError, VerificationError, InvalidHashError):
            continue
    raise HTTPException(status_code=401, detail="Invalid API key")
Repobility — same analyzer, your code, free for public repos · /scan/
TenantConfig class · python · L10-L20 (11 LOC)
harness/config/loader.py
class TenantConfig(BaseModel):
    """Configuration for a single API tenant."""

    tenant_id: str
    api_key_hash: str
    rpm_limit: int = 60
    tpm_limit: int = 100_000
    allowed_models: List[str] = ["*"]
    bypass: bool = False
    pii_strictness: str = "balanced"
    rail_overrides: Dict[str, Dict[str, object]] = {}
TenantsFile class · python · L24-L27 (4 LOC)
harness/config/loader.py
class TenantsFile(BaseModel):
    """Root schema for tenants.yaml."""

    tenants: List[TenantConfig]
load_tenants function · python · L30-L56 (27 LOC)
harness/config/loader.py
def load_tenants(config_path: str) -> List[TenantConfig]:
    """Load and validate tenants from a YAML file.

    Args:
        config_path: Path to a tenants.yaml file.

    Returns:
        List of validated TenantConfig objects.

    Raises:
        ValueError: If the YAML is malformed or fails schema validation.
    """
    try:
        with open(config_path, "r") as f:
            raw = yaml.safe_load(f)
    except yaml.YAMLError as exc:
        raise ValueError(f"Failed to parse tenants YAML: {exc}") from exc

    if raw is None:
        raise ValueError("Tenants file is empty")

    try:
        parsed = TenantsFile.model_validate(raw)
    except ValidationError as exc:
        raise ValueError(f"Tenants schema validation failed: {exc}") from exc

    return parsed.tenants
RailConfig class · python · L14-L32 (19 LOC)
harness/config/rail_loader.py
class RailConfig(BaseModel):
    """Configuration for a single guardrail."""

    name: str
    enabled: bool = True
    threshold: float = 0.7
    refusal_mode: Literal["hard_block", "soft_steer", "informative"] = "hard_block"
    critique_threshold: Optional[float] = None

    @model_validator(mode='after')
    def validate_critique_threshold(self) -> 'RailConfig':
        """Ensure critique_threshold is strictly less than threshold when set."""
        if self.critique_threshold is not None:
            if self.critique_threshold >= self.threshold:
                raise ValueError(
                    f"Rail '{self.name}': critique_threshold ({self.critique_threshold}) "
                    f"must be less than threshold ({self.threshold})"
                )
        return self
validate_critique_threshold method · python · L24-L32 (9 LOC)
harness/config/rail_loader.py
    def validate_critique_threshold(self) -> 'RailConfig':
        """Ensure critique_threshold is strictly less than threshold when set."""
        if self.critique_threshold is not None:
            if self.critique_threshold >= self.threshold:
                raise ValueError(
                    f"Rail '{self.name}': critique_threshold ({self.critique_threshold}) "
                    f"must be less than threshold ({self.threshold})"
                )
        return self
RailsFile class · python · L35-L38 (4 LOC)
harness/config/rail_loader.py
class RailsFile(BaseModel):
    """Root schema for rails.yaml."""

    rails: List[RailConfig]
load_rails_config function · python · L41-L67 (27 LOC)
harness/config/rail_loader.py
def load_rails_config(config_path: str) -> List[RailConfig]:
    """Load and validate rails from a YAML file.

    Args:
        config_path: Path to a rails.yaml file.

    Returns:
        List of validated RailConfig objects.

    Raises:
        ValueError: If the YAML is malformed, empty, or fails schema validation.
    """
    try:
        with open(config_path, "r") as f:
            raw = yaml.safe_load(f)
    except yaml.YAMLError as exc:
        raise ValueError(f"Failed to parse rails YAML: {exc}") from exc

    if raw is None:
        raise ValueError("Rails config file is empty")

    try:
        parsed = RailsFile.model_validate(raw)
    except ValidationError as exc:
        raise ValueError(f"Rails schema validation failed: {exc}") from exc

    return parsed.rails
analyze_traces function · python · L20-L244 (225 LOC)
harness/critique/analyzer.py
async def analyze_traces(
    trace_store: TraceStore,
    http_client: Any,
    constitution: ConstitutionConfig,
    since: str,
    judge_model: str = "default",
) -> dict:
    """Analyze historical critique traces and produce tuning suggestions.

    Args:
        trace_store: Async SQLite-backed trace storage.
        http_client: Async HTTP client (httpx.AsyncClient or compatible mock).
        constitution: Loaded ConstitutionConfig (provides judge_model + principles).
        since: ISO8601 lower-bound timestamp for the analysis window.
        judge_model: Override for judge model name; "default" falls back to
                     constitution.judge_model, then "unknown".

    Returns:
        dict with keys:
            "report"       — human-readable markdown (str)
            "yaml_diffs"   — machine-readable list of suggestion dicts
            "generated_at" — ISO8601 timestamp string
    """
    generated_at = datetime.now(timezone.utc).isoformat()

    # --- 1. Query tr
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
Principle class · python · L14-L21 (8 LOC)
harness/critique/constitution.py
class Principle(BaseModel):
    """A single constitutional principle."""

    id: str
    text: str
    category: str
    priority: float
    enabled: bool = True
ConstitutionConfig class · python · L24-L28 (5 LOC)
harness/critique/constitution.py
class ConstitutionConfig(BaseModel):
    """Validated constitution configuration (inner object)."""

    judge_model: str = "default"
    principles: List[Principle]
ConstitutionFile class · python · L31-L34 (4 LOC)
harness/critique/constitution.py
class ConstitutionFile(BaseModel):
    """Root schema for constitution.yaml."""

    constitution: ConstitutionConfig
load_constitution function · python · L37-L63 (27 LOC)
harness/critique/constitution.py
def load_constitution(config_path: str) -> ConstitutionConfig:
    """Load and validate a constitution from a YAML file.

    Args:
        config_path: Path to a constitution.yaml file.

    Returns:
        Validated ConstitutionConfig object.

    Raises:
        ValueError: If the YAML is malformed, empty, or fails schema validation.
    """
    try:
        with open(config_path, "r") as f:
            raw = yaml.safe_load(f)
    except yaml.YAMLError as exc:
        raise ValueError(f"Failed to parse constitution YAML: {exc}") from exc

    if raw is None:
        raise ValueError("Constitution config file is empty")

    try:
        parsed = ConstitutionFile.model_validate(raw)
    except ValidationError as exc:
        raise ValueError(f"Constitution schema validation failed: {exc}") from exc

    return parsed.constitution
CritiqueEngine class · python · L35-L288 (254 LOC)
harness/critique/engine.py
class CritiqueEngine:
    """Single-pass critique-revise loop gated by per-rail critique_threshold.

    High-risk outputs (score >= critique_threshold AND not blocked) are sent
    to a judge LLM with relevant constitutional principles. If the revision
    still scores high-risk on re-check, the response falls back to hard block.
    """

    def __init__(
        self,
        constitution: ConstitutionConfig,
        guardrail_engine,
    ) -> None:
        """Initialize with a constitution and a guardrail engine for re-checking.

        Args:
            constitution: Loaded ConstitutionConfig with principles and judge_model.
            guardrail_engine: GuardrailEngine instance used for revision re-check.
        """
        self._constitution = constitution
        self._guardrail_engine = guardrail_engine

    @property
    def constitution(self) -> ConstitutionConfig:
        """Public access to the constitution config."""
        return self._constitution

    async def run_
__init__ method · python · L43-L55 (13 LOC)
harness/critique/engine.py
    def __init__(
        self,
        constitution: ConstitutionConfig,
        guardrail_engine,
    ) -> None:
        """Initialize with a constitution and a guardrail engine for re-checking.

        Args:
            constitution: Loaded ConstitutionConfig with principles and judge_model.
            guardrail_engine: GuardrailEngine instance used for revision re-check.
        """
        self._constitution = constitution
        self._guardrail_engine = guardrail_engine
run_critique_loop method · python · L62-L214 (153 LOC)
harness/critique/engine.py
    async def run_critique_loop(
        self,
        response_data: dict,
        output_results: list[RailResult],
        request_model: str,
        http_client,
        pii_strictness: str = "balanced",
    ) -> dict | None:
        """Run critique-revise loop for borderline outputs.

        Args:
            response_data: OpenAI-format response dict from LiteLLM.
            output_results: RailResult list from GuardrailEngine.check_output().
            request_model: The model name from the original request.
            http_client: httpx.AsyncClient (base_url already set to LiteLLM).
            pii_strictness: PII redaction strictness level for revision text.

        Returns:
            cai_critique dict if critique ran, None if benign or on failure (fail-open).
        """
        # Find first rail that exceeds critique_threshold (and was not blocked)
        triggering_rail: str | None = None
        triggering_score: float = 0.0
        critique_threshold_value: float
_build_critique_prompt method · python · L216-L251 (36 LOC)
harness/critique/engine.py
    def _build_critique_prompt(
        self,
        output_text: str,
        triggering_rail: str,
        relevant_principles: list[Principle],
    ) -> tuple[str, str]:
        """Build system and user content for the judge model call.

        Args:
            output_text: The AI-generated text to evaluate.
            triggering_rail: Name of the guardrail that triggered critique.
            relevant_principles: Category-filtered enabled principles.

        Returns:
            (system_prompt, user_content) tuple.
        """
        system_prompt = (
            'You are a safety judge. Evaluate the following AI-generated text against the listed '
            'constitutional principles. Respond ONLY with valid JSON matching the schema: '
            '{"violated_principles": [<principle IDs>], "critique": "<explanation>", '
            '"revision": "<revised text>", "confidence": <0.0-1.0>}. '
            "The revision must address all identified violations while preserving t
Same scanner, your repo: https://repobility.com — Repobility
_call_judge method · python · L253-L288 (36 LOC)
harness/critique/engine.py
    async def _call_judge(
        self,
        http_client,
        model: str,
        system_prompt: str,
        user_content: str,
    ) -> dict:
        """POST to LiteLLM /v1/chat/completions and return parsed judge JSON.

        Args:
            http_client: httpx.AsyncClient with LiteLLM base_url set.
            model: Resolved judge model name.
            system_prompt: System message content.
            user_content: User message content.

        Returns:
            Parsed dict from judge JSON response.

        Raises:
            Exception: On HTTP error or JSON parse failure.
        """
        resp = await http_client.post(
            "/v1/chat/completions",
            json={
                "model": model,
                "messages": [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_content},
                ],
                "response_format": {"type": "json_object"},
            },
    
main function · python · L11-L27 (17 LOC)
harness/critique/__main__.py
def main():
    parser = argparse.ArgumentParser(description="Constitutional AI critique tools")
    subparsers = parser.add_subparsers(dest="command")

    analyze_parser = subparsers.add_parser("analyze", help="Analyze trace history for tuning suggestions")
    analyze_parser.add_argument("--since", default="24h", help="Time window: ISO8601 or shorthand (24h, 7d)")
    analyze_parser.add_argument("--min-samples", type=int, default=10, help="Minimum critique records required")
    analyze_parser.add_argument("--db", default=None, help="Path to traces.db (default: harness/data/traces.db)")
    analyze_parser.add_argument("--config-dir", default=None, help="Path to config dir (default: harness/config)")

    args = parser.parse_args()
    if args.command is None:
        parser.print_help()
        sys.exit(1)

    if args.command == "analyze":
        asyncio.run(_run_analyze(args))
_run_analyze function · python · L30-L56 (27 LOC)
harness/critique/__main__.py
async def _run_analyze(args):
    from harness.critique.analyzer import analyze_traces
    from harness.critique.constitution import load_constitution
    from harness.traces.store import TraceStore
    from harness.proxy.admin import _resolve_since
    import httpx

    config_dir = args.config_dir or os.path.join(os.path.dirname(os.path.dirname(__file__)), "config")
    db_path = args.db or os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "traces.db")

    constitution = load_constitution(os.path.join(config_dir, "constitution.yaml"))
    trace_store = TraceStore(db_path=db_path)

    litellm_base = os.environ.get("LITELLM_BASE_URL", "http://localhost:4000")
    async with httpx.AsyncClient(base_url=litellm_base, timeout=httpx.Timeout(120.0)) as http_client:
        since_ts = _resolve_since(args.since)
        result = await analyze_traces(
            trace_store=trace_store,
            http_client=http_client,
            constitution=constitution,
            sin
page 1 / 4next ›