Function bodies 178 total
CheckResult class · python · L50-L55 (6 LOC)examples/dgx_toolbox.py
class CheckResult:
"""Result of a single validation check."""
name: str
passed: bool
message: str
details: dict[str, Any] = field(default_factory=dict)ValidationResult class · python · L59-L76 (18 LOC)examples/dgx_toolbox.py
class ValidationResult:
"""Result of all validation checks."""
checks: list[CheckResult] = field(default_factory=list)
@property
def ok(self) -> bool:
return all(c.passed for c in self.checks)
def report(self) -> str:
lines = []
for c in self.checks:
icon = "✓" if c.passed else "✗"
lines.append(f" {icon} {c.name}: {c.message}")
return "\n".join(lines)
@property
def failures(self) -> list[CheckResult]:
return [c for c in self.checks if not c.passed]report method · python · L67-L72 (6 LOC)examples/dgx_toolbox.py
def report(self) -> str:
lines = []
for c in self.checks:
icon = "✓" if c.passed else "✗"
lines.append(f" {icon} {c.name}: {c.message}")
return "\n".join(lines)ExecResult class · python · L80-L99 (20 LOC)examples/dgx_toolbox.py
class ExecResult:
"""Result of a container execution."""
command: list[str]
returncode: int
stdout: str
stderr: str
duration_s: float
container: str
skipped: bool = False
skip_reason: str = ""
@property
def ok(self) -> bool:
return self.returncode == 0 or self.skipped
def summary(self) -> str:
if self.skipped:
return f"SKIPPED: {self.skip_reason}"
status = "OK" if self.ok else f"FAILED (exit {self.returncode})"
return f"{status} [{self.duration_s:.1f}s] {' '.join(self.command[-3:])}"summary method · python · L95-L99 (5 LOC)examples/dgx_toolbox.py
def summary(self) -> str:
if self.skipped:
return f"SKIPPED: {self.skip_reason}"
status = "OK" if self.ok else f"FAILED (exit {self.returncode})"
return f"{status} [{self.duration_s:.1f}s] {' '.join(self.command[-3:])}"__init__ method · python · L112-L117 (6 LOC)examples/dgx_toolbox.py
def __init__(self, config_path: Path | None = None):
self._config = self._load_config(config_path or CONFIG_PATH)
self._base = self._resolve_base()
self._project_root = Path(self._config.get("project_root", ".")).resolve()
self._containers: dict[str, dict] = self._config.get("containers", {})
self._exec_log: list[ExecResult] = []_load_config method · python · L121-L124 (4 LOC)examples/dgx_toolbox.py
def _load_config(self, path: Path) -> dict:
if path.exists():
return yaml.safe_load(path.read_text()) or {}
return {}Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_resolve_base method · python · L126-L133 (8 LOC)examples/dgx_toolbox.py
def _resolve_base(self) -> Path:
env_path = os.environ.get("DGX_TOOLBOX_PATH")
if env_path:
return Path(env_path).expanduser().resolve()
config_path = self._config.get("dgx_toolbox_path")
if config_path:
return Path(config_path).expanduser().resolve()
return Path("~/dgx-toolbox").expanduser().resolve()resolve method · python · L152-L169 (18 LOC)examples/dgx_toolbox.py
def resolve(self, component: str) -> Path:
"""Resolve a component name to its full script path."""
if not self.available:
raise FileNotFoundError(
f"DGX Toolbox not found at {self._base}. "
f"Set DGX_TOOLBOX_PATH or update config/dgx_toolbox.yaml"
)
components = self._config.get("components", {})
rel_path = components.get(component)
if not rel_path:
raise KeyError(
f"Unknown component '{component}'. "
f"Available: {', '.join(components.keys())}"
)
full_path = self._base / rel_path
if not full_path.exists():
raise FileNotFoundError(f"Component not found: {full_path}")
return full_pathport method · python · L171-L173 (3 LOC)examples/dgx_toolbox.py
def port(self, component: str) -> int | None:
"""Get the configured port for a component."""
return self._config.get("ports", {}).get(component)vllm_endpoint method · python · L180-L182 (3 LOC)examples/dgx_toolbox.py
def vllm_endpoint(self) -> str:
port = self.port("vllm") or 8020
return f"http://localhost:{port}/v1"litellm_endpoint method · python · L184-L186 (3 LOC)examples/dgx_toolbox.py
def litellm_endpoint(self) -> str:
port = self.port("litellm") or 4000
return f"http://localhost:{port}/v1"validate method · python · L190-L210 (21 LOC)examples/dgx_toolbox.py
def validate(self, checks: list[str]) -> ValidationResult:
"""Run named validation checks. Returns structured result.
Available checks:
"toolbox" — dgx-toolbox directory exists
"training_data" — training data file exists (path from config)
"config" — config file exists (path from config)
"memory:N" — at least N GB available memory
"container:name" — named container is running
"mounted:name" — project is mounted in named container
"gpu" — GPU is accessible (via container)
"deps:name" — pinned deps installed in container
"""
result = ValidationResult()
for check in checks:
if ":" in check:
name, arg = check.split(":", 1)
else:
name, arg = check, ""
result.checks.append(self._run_check(name, arg))
return result_run_check method · python · L212-L229 (18 LOC)examples/dgx_toolbox.py
def _run_check(self, name: str, arg: str) -> CheckResult:
dispatch = {
"toolbox": self._check_toolbox,
"training_data": self._check_training_data,
"config": self._check_config,
"memory": self._check_memory,
"container": self._check_container,
"mounted": self._check_mounted,
"gpu": self._check_gpu,
"deps": self._check_deps,
}
fn = dispatch.get(name)
if not fn:
return CheckResult(name, False, f"Unknown check: {name}")
try:
return fn(arg)
except Exception as e:
return CheckResult(name, False, f"Check error: {e}")_check_toolbox method · python · L231-L234 (4 LOC)examples/dgx_toolbox.py
def _check_toolbox(self, _: str) -> CheckResult:
if self.available:
return CheckResult("toolbox", True, f"Found at {self._base}")
return CheckResult("toolbox", False, f"Not found at {self._base}")Same scanner, your repo: https://repobility.com — Repobility
_check_training_data method · python · L236-L242 (7 LOC)examples/dgx_toolbox.py
def _check_training_data(self, _: str) -> CheckResult:
rel = self._config.get("validation_paths", {}).get("training_data", "data/train.jsonl")
path = self._project_root / rel
if path.exists():
lines = sum(1 for _ in open(path))
return CheckResult("training_data", True, f"{lines} examples", {"path": str(path), "lines": lines})
return CheckResult("training_data", False, f"Not found: {path}")_check_config method · python · L244-L249 (6 LOC)examples/dgx_toolbox.py
def _check_config(self, _: str) -> CheckResult:
rel = self._config.get("validation_paths", {}).get("config", "config/train_config.yaml")
path = self._project_root / rel
if path.exists():
return CheckResult("config", True, f"Found: {path}")
return CheckResult("config", False, f"Not found: {path}")_check_memory method · python · L251-L277 (27 LOC)examples/dgx_toolbox.py
def _check_memory(self, arg: str) -> CheckResult:
min_gb = float(arg) if arg else 70.0
try:
meminfo = Path("/proc/meminfo").read_text()
mem = {l.split(":")[0].strip(): int(l.split(":")[1].strip().split()[0])
for l in meminfo.splitlines() if ":" in l}
avail_gb = mem.get("MemAvailable", 0) / (1024 * 1024)
if avail_gb >= min_gb:
return CheckResult("memory", True, f"{avail_gb:.1f} GB available (need {min_gb})",
{"available_gb": avail_gb, "required_gb": min_gb})
# Get top memory consumers for diagnostics
ps = subprocess.run(["ps", "aux", "--sort=-rss"], capture_output=True, text=True, timeout=5)
top_procs = []
for line in ps.stdout.strip().split("\n")[1:6]:
parts = line.split(None, 10)
if len(parts) >= 11:
rss_mb = int(parts[5]) // 1024 if parts[5].isdigit(_check_container method · python · L279-L286 (8 LOC)examples/dgx_toolbox.py
def _check_container(self, name: str) -> CheckResult:
cname = self._containers.get(name, {}).get("container_name", name)
result = subprocess.run(["docker", "ps", "--format", "{{.Names}}"],
capture_output=True, text=True, timeout=5)
running = result.stdout.strip().split("\n")
if cname in running:
return CheckResult(f"container:{name}", True, f"{cname} is running")
return CheckResult(f"container:{name}", False, f"{cname} is not running")_check_mounted method · python · L288-L299 (12 LOC)examples/dgx_toolbox.py
def _check_mounted(self, name: str) -> CheckResult:
mapping = self._containers.get(name, {})
cname = mapping.get("container_name", name)
workdir = mapping.get("workdir", "/workspace/project")
check_file = self._config.get("mount_check_file", "pyproject.toml")
check = subprocess.run(
["docker", "exec", cname, "test", "-f", f"{workdir}/{check_file}"],
capture_output=True, timeout=5)
if check.returncode == 0:
return CheckResult(f"mounted:{name}", True, f"Project visible at {workdir}")
return CheckResult(f"mounted:{name}", False,
f"Project not mounted in {cname}. Restart with EXTRA_MOUNTS.")_check_gpu method · python · L301-L313 (13 LOC)examples/dgx_toolbox.py
def _check_gpu(self, name: str) -> CheckResult:
cname = self._containers.get(name, {}).get("container_name", name) if name else None
if not cname:
# Fall back to first container
first = next(iter(self._containers.values()), {})
cname = first.get("container_name", "")
result = subprocess.run(
["docker", "exec", cname, "nvidia-smi", "--query-gpu=name,memory.total",
"--format=csv,noheader"],
capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip():
return CheckResult("gpu", True, result.stdout.strip())
return CheckResult("gpu", False, "No GPU access in container")_check_deps method · python · L315-L327 (13 LOC)examples/dgx_toolbox.py
def _check_deps(self, name: str) -> CheckResult:
cname = self._containers.get(name, {}).get("container_name", name)
workdir = self._containers.get(name, {}).get("workdir", "/workspace")
# Check critical imports
imports = self._config.get("required_imports", ["torch", "numpy"])
check_script = "import " + ",".join(imports) + ";print('OK')"
result = subprocess.run(
["docker", "exec", "-w", workdir, cname, "python", "-c", check_script],
capture_output=True, text=True, timeout=30)
if result.returncode == 0 and "OK" in result.stdout:
return CheckResult(f"deps:{name}", True, "All pinned deps available")
missing = result.stderr.strip().split("\n")[-1] if result.stderr else "unknown"
return CheckResult(f"deps:{name}", False, f"Missing deps: {missing}")ensure_ready method · python · L331-L383 (53 LOC)examples/dgx_toolbox.py
def ensure_ready(self, component: str, wait: int = 45) -> ValidationResult:
"""Ensure a container is running, project mounted, and deps installed.
This is the main entry point for preparing a container for work.
It handles: start → wait → mount check → dep install → validate.
Args:
component: Key from containers config (e.g., "unsloth_studio")
wait: Seconds to wait after starting container
Returns:
ValidationResult with all checks.
"""
mapping = self._containers.get(component)
if not mapping:
r = ValidationResult()
r.checks.append(CheckResult(component, False, f"Unknown component: {component}"))
return r
cname = mapping["container_name"]
workdir = mapping.get("workdir")
# Step 1: Is container running?
container_check = self._check_container(component)
if not container_check.passed:
print(f" Want this analysis on your repo? https://repobility.com/scan/
_start_container method · python · L385-L391 (7 LOC)examples/dgx_toolbox.py
def _start_container(self, component: str) -> None:
"""Start a container via dgx-toolbox with EXTRA_MOUNTS."""
mapping = self._containers[component]
script = self.resolve(mapping["component"])
env = os.environ.copy()
env["EXTRA_MOUNTS"] = f"{self._project_root}:{mapping['workdir']}"
subprocess.run(["bash", str(script)], env=env, capture_output=True, text=True)_install_deps method · python · L393-L407 (15 LOC)examples/dgx_toolbox.py
def _install_deps(self, component: str) -> None:
"""Install pinned deps inside a container."""
cname = self._containers[component]["container_name"]
versions = self.pinned_versions
if not versions:
return
# Build pip install command from pinned versions
pkgs = [f"{pkg}=={ver}" for pkg, ver in versions.items()]
extras = self._config.get("extra_deps", [])
cmd = ["docker", "exec", cname, "pip", "install", "--no-deps"] + pkgs
subprocess.run(cmd, capture_output=True, text=True, timeout=300)
# Install extras (with deps)
if extras:
cmd2 = ["docker", "exec", cname, "pip", "install"] + extras
subprocess.run(cmd2, capture_output=True, text=True, timeout=300)execute method · python · L411-L473 (63 LOC)examples/dgx_toolbox.py
def execute(
self,
component: str,
*cmd: str,
capture: bool = False,
timeout: int | None = None,
idempotency_check: str | None = None,
) -> ExecResult:
"""Execute a command inside a container. The core execution method.
Args:
component: Key from containers config (e.g., "unsloth_studio")
*cmd: Command and args (e.g., "python", "-m", "scripts.train_model")
capture: Capture stdout/stderr (default: stream to terminal)
timeout: Timeout in seconds
idempotency_check: Path to check inside container — if exists, skip execution
Returns:
ExecResult with structured output.
"""
mapping = self._containers.get(component)
if not mapping:
return ExecResult(list(cmd), 1, "", f"Unknown component: {component}", 0, "")
cname = mapping["container_name"]
workdir = mapping.get("workdir")
# Idemporun_service method · python · L475-L502 (28 LOC)examples/dgx_toolbox.py
def run_service(self, component: str, *args: str) -> ExecResult:
"""Start a dgx-toolbox service (vLLM, LiteLLM, etc.) — not exec'd into.
Args:
component: Key from components config (e.g., "vllm")
*args: Additional arguments (e.g., model name)
"""
script = self.resolve(component)
env = os.environ.copy()
mount_target = self._containers.get(component, {}).get("workdir", "/workspace")
env["EXTRA_MOUNTS"] = f"{self._project_root}:{mount_target}"
start = time.time()
proc = subprocess.run(
["bash", str(script)] + list(args),
env=env, capture_output=True, text=True, timeout=60,
)
duration = time.time() - start
cname = self._containers.get(component, {}).get("container_name", component)
result = ExecResult(
command=["bash", str(script)] + list(args),
returncode=proc.returncode,
stdout=proc.stdout,
status_report method · python · L506-L577 (72 LOC)examples/dgx_toolbox.py
def status_report(self) -> dict:
"""Structured status for telemetry agents to consume.
Returns a dict with container states, execution log, resource usage,
and pipeline progress — everything a background observer needs.
"""
# Container states
ps = subprocess.run(["docker", "ps", "--format", "json"],
capture_output=True, text=True, timeout=5)
containers = []
if ps.stdout.strip():
for line in ps.stdout.strip().split("\n"):
try:
containers.append(json.loads(line))
except json.JSONDecodeError:
pass
# Memory
try:
meminfo = Path("/proc/meminfo").read_text()
mem = {l.split(":")[0].strip(): int(l.split(":")[1].strip().split()[0])
for l in meminfo.splitlines() if ":" in l}
memory = {
"total_gb": round(mem.get("MemTotal", 0) / (102info method · python · L581-L591 (11 LOC)examples/dgx_toolbox.py
def info(self) -> dict:
"""Basic toolbox info for diagnostics."""
return {
"path": str(self._base),
"available": self.available,
"config_file": str(CONFIG_PATH),
"vllm_endpoint": self.vllm_endpoint(),
"litellm_endpoint": self.litellm_endpoint(),
"components": list(self._config.get("components", {}).keys()),
"containers": list(self._containers.keys()),
}get_toolbox function · python · L599-L604 (6 LOC)examples/dgx_toolbox.py
def get_toolbox() -> DGXToolbox:
"""Get the singleton DGXToolbox instance."""
global _instance
if _instance is None:
_instance = DGXToolbox()
return _instanceverify_api_key function · python · L13-L29 (17 LOC)harness/auth/bearer.py
async def verify_api_key(
credentials: HTTPAuthorizationCredentials = Depends(_bearer),
request: Request = None,
) -> TenantConfig:
"""FastAPI dependency that verifies a Bearer token against tenant hashes.
Returns the matching TenantConfig on success.
Raises HTTP 401 if the key does not match any tenant.
"""
token = credentials.credentials
for tenant in request.app.state.tenants:
try:
_ph.verify(tenant.api_key_hash, token)
return tenant
except (VerifyMismatchError, VerificationError, InvalidHashError):
continue
raise HTTPException(status_code=401, detail="Invalid API key")Repobility — same analyzer, your code, free for public repos · /scan/
TenantConfig class · python · L10-L20 (11 LOC)harness/config/loader.py
class TenantConfig(BaseModel):
"""Configuration for a single API tenant."""
tenant_id: str
api_key_hash: str
rpm_limit: int = 60
tpm_limit: int = 100_000
allowed_models: List[str] = ["*"]
bypass: bool = False
pii_strictness: str = "balanced"
rail_overrides: Dict[str, Dict[str, object]] = {}TenantsFile class · python · L24-L27 (4 LOC)harness/config/loader.py
class TenantsFile(BaseModel):
"""Root schema for tenants.yaml."""
tenants: List[TenantConfig]load_tenants function · python · L30-L56 (27 LOC)harness/config/loader.py
def load_tenants(config_path: str) -> List[TenantConfig]:
"""Load and validate tenants from a YAML file.
Args:
config_path: Path to a tenants.yaml file.
Returns:
List of validated TenantConfig objects.
Raises:
ValueError: If the YAML is malformed or fails schema validation.
"""
try:
with open(config_path, "r") as f:
raw = yaml.safe_load(f)
except yaml.YAMLError as exc:
raise ValueError(f"Failed to parse tenants YAML: {exc}") from exc
if raw is None:
raise ValueError("Tenants file is empty")
try:
parsed = TenantsFile.model_validate(raw)
except ValidationError as exc:
raise ValueError(f"Tenants schema validation failed: {exc}") from exc
return parsed.tenantsRailConfig class · python · L14-L32 (19 LOC)harness/config/rail_loader.py
class RailConfig(BaseModel):
"""Configuration for a single guardrail."""
name: str
enabled: bool = True
threshold: float = 0.7
refusal_mode: Literal["hard_block", "soft_steer", "informative"] = "hard_block"
critique_threshold: Optional[float] = None
@model_validator(mode='after')
def validate_critique_threshold(self) -> 'RailConfig':
"""Ensure critique_threshold is strictly less than threshold when set."""
if self.critique_threshold is not None:
if self.critique_threshold >= self.threshold:
raise ValueError(
f"Rail '{self.name}': critique_threshold ({self.critique_threshold}) "
f"must be less than threshold ({self.threshold})"
)
return selfvalidate_critique_threshold method · python · L24-L32 (9 LOC)harness/config/rail_loader.py
def validate_critique_threshold(self) -> 'RailConfig':
"""Ensure critique_threshold is strictly less than threshold when set."""
if self.critique_threshold is not None:
if self.critique_threshold >= self.threshold:
raise ValueError(
f"Rail '{self.name}': critique_threshold ({self.critique_threshold}) "
f"must be less than threshold ({self.threshold})"
)
return selfRailsFile class · python · L35-L38 (4 LOC)harness/config/rail_loader.py
class RailsFile(BaseModel):
"""Root schema for rails.yaml."""
rails: List[RailConfig]load_rails_config function · python · L41-L67 (27 LOC)harness/config/rail_loader.py
def load_rails_config(config_path: str) -> List[RailConfig]:
"""Load and validate rails from a YAML file.
Args:
config_path: Path to a rails.yaml file.
Returns:
List of validated RailConfig objects.
Raises:
ValueError: If the YAML is malformed, empty, or fails schema validation.
"""
try:
with open(config_path, "r") as f:
raw = yaml.safe_load(f)
except yaml.YAMLError as exc:
raise ValueError(f"Failed to parse rails YAML: {exc}") from exc
if raw is None:
raise ValueError("Rails config file is empty")
try:
parsed = RailsFile.model_validate(raw)
except ValidationError as exc:
raise ValueError(f"Rails schema validation failed: {exc}") from exc
return parsed.railsanalyze_traces function · python · L20-L244 (225 LOC)harness/critique/analyzer.py
async def analyze_traces(
trace_store: TraceStore,
http_client: Any,
constitution: ConstitutionConfig,
since: str,
judge_model: str = "default",
) -> dict:
"""Analyze historical critique traces and produce tuning suggestions.
Args:
trace_store: Async SQLite-backed trace storage.
http_client: Async HTTP client (httpx.AsyncClient or compatible mock).
constitution: Loaded ConstitutionConfig (provides judge_model + principles).
since: ISO8601 lower-bound timestamp for the analysis window.
judge_model: Override for judge model name; "default" falls back to
constitution.judge_model, then "unknown".
Returns:
dict with keys:
"report" — human-readable markdown (str)
"yaml_diffs" — machine-readable list of suggestion dicts
"generated_at" — ISO8601 timestamp string
"""
generated_at = datetime.now(timezone.utc).isoformat()
# --- 1. Query trRepobility — the code-quality scanner for AI-generated software · https://repobility.com
Principle class · python · L14-L21 (8 LOC)harness/critique/constitution.py
class Principle(BaseModel):
"""A single constitutional principle."""
id: str
text: str
category: str
priority: float
enabled: bool = TrueConstitutionConfig class · python · L24-L28 (5 LOC)harness/critique/constitution.py
class ConstitutionConfig(BaseModel):
"""Validated constitution configuration (inner object)."""
judge_model: str = "default"
principles: List[Principle]ConstitutionFile class · python · L31-L34 (4 LOC)harness/critique/constitution.py
class ConstitutionFile(BaseModel):
"""Root schema for constitution.yaml."""
constitution: ConstitutionConfigload_constitution function · python · L37-L63 (27 LOC)harness/critique/constitution.py
def load_constitution(config_path: str) -> ConstitutionConfig:
"""Load and validate a constitution from a YAML file.
Args:
config_path: Path to a constitution.yaml file.
Returns:
Validated ConstitutionConfig object.
Raises:
ValueError: If the YAML is malformed, empty, or fails schema validation.
"""
try:
with open(config_path, "r") as f:
raw = yaml.safe_load(f)
except yaml.YAMLError as exc:
raise ValueError(f"Failed to parse constitution YAML: {exc}") from exc
if raw is None:
raise ValueError("Constitution config file is empty")
try:
parsed = ConstitutionFile.model_validate(raw)
except ValidationError as exc:
raise ValueError(f"Constitution schema validation failed: {exc}") from exc
return parsed.constitutionCritiqueEngine class · python · L35-L288 (254 LOC)harness/critique/engine.py
class CritiqueEngine:
"""Single-pass critique-revise loop gated by per-rail critique_threshold.
High-risk outputs (score >= critique_threshold AND not blocked) are sent
to a judge LLM with relevant constitutional principles. If the revision
still scores high-risk on re-check, the response falls back to hard block.
"""
def __init__(
self,
constitution: ConstitutionConfig,
guardrail_engine,
) -> None:
"""Initialize with a constitution and a guardrail engine for re-checking.
Args:
constitution: Loaded ConstitutionConfig with principles and judge_model.
guardrail_engine: GuardrailEngine instance used for revision re-check.
"""
self._constitution = constitution
self._guardrail_engine = guardrail_engine
@property
def constitution(self) -> ConstitutionConfig:
"""Public access to the constitution config."""
return self._constitution
async def run___init__ method · python · L43-L55 (13 LOC)harness/critique/engine.py
def __init__(
self,
constitution: ConstitutionConfig,
guardrail_engine,
) -> None:
"""Initialize with a constitution and a guardrail engine for re-checking.
Args:
constitution: Loaded ConstitutionConfig with principles and judge_model.
guardrail_engine: GuardrailEngine instance used for revision re-check.
"""
self._constitution = constitution
self._guardrail_engine = guardrail_enginerun_critique_loop method · python · L62-L214 (153 LOC)harness/critique/engine.py
async def run_critique_loop(
self,
response_data: dict,
output_results: list[RailResult],
request_model: str,
http_client,
pii_strictness: str = "balanced",
) -> dict | None:
"""Run critique-revise loop for borderline outputs.
Args:
response_data: OpenAI-format response dict from LiteLLM.
output_results: RailResult list from GuardrailEngine.check_output().
request_model: The model name from the original request.
http_client: httpx.AsyncClient (base_url already set to LiteLLM).
pii_strictness: PII redaction strictness level for revision text.
Returns:
cai_critique dict if critique ran, None if benign or on failure (fail-open).
"""
# Find first rail that exceeds critique_threshold (and was not blocked)
triggering_rail: str | None = None
triggering_score: float = 0.0
critique_threshold_value: float_build_critique_prompt method · python · L216-L251 (36 LOC)harness/critique/engine.py
def _build_critique_prompt(
self,
output_text: str,
triggering_rail: str,
relevant_principles: list[Principle],
) -> tuple[str, str]:
"""Build system and user content for the judge model call.
Args:
output_text: The AI-generated text to evaluate.
triggering_rail: Name of the guardrail that triggered critique.
relevant_principles: Category-filtered enabled principles.
Returns:
(system_prompt, user_content) tuple.
"""
system_prompt = (
'You are a safety judge. Evaluate the following AI-generated text against the listed '
'constitutional principles. Respond ONLY with valid JSON matching the schema: '
'{"violated_principles": [<principle IDs>], "critique": "<explanation>", '
'"revision": "<revised text>", "confidence": <0.0-1.0>}. '
"The revision must address all identified violations while preserving tSame scanner, your repo: https://repobility.com — Repobility
_call_judge method · python · L253-L288 (36 LOC)harness/critique/engine.py
async def _call_judge(
self,
http_client,
model: str,
system_prompt: str,
user_content: str,
) -> dict:
"""POST to LiteLLM /v1/chat/completions and return parsed judge JSON.
Args:
http_client: httpx.AsyncClient with LiteLLM base_url set.
model: Resolved judge model name.
system_prompt: System message content.
user_content: User message content.
Returns:
Parsed dict from judge JSON response.
Raises:
Exception: On HTTP error or JSON parse failure.
"""
resp = await http_client.post(
"/v1/chat/completions",
json={
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
"response_format": {"type": "json_object"},
},
main function · python · L11-L27 (17 LOC)harness/critique/__main__.py
def main():
parser = argparse.ArgumentParser(description="Constitutional AI critique tools")
subparsers = parser.add_subparsers(dest="command")
analyze_parser = subparsers.add_parser("analyze", help="Analyze trace history for tuning suggestions")
analyze_parser.add_argument("--since", default="24h", help="Time window: ISO8601 or shorthand (24h, 7d)")
analyze_parser.add_argument("--min-samples", type=int, default=10, help="Minimum critique records required")
analyze_parser.add_argument("--db", default=None, help="Path to traces.db (default: harness/data/traces.db)")
analyze_parser.add_argument("--config-dir", default=None, help="Path to config dir (default: harness/config)")
args = parser.parse_args()
if args.command is None:
parser.print_help()
sys.exit(1)
if args.command == "analyze":
asyncio.run(_run_analyze(args))_run_analyze function · python · L30-L56 (27 LOC)harness/critique/__main__.py
async def _run_analyze(args):
from harness.critique.analyzer import analyze_traces
from harness.critique.constitution import load_constitution
from harness.traces.store import TraceStore
from harness.proxy.admin import _resolve_since
import httpx
config_dir = args.config_dir or os.path.join(os.path.dirname(os.path.dirname(__file__)), "config")
db_path = args.db or os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "traces.db")
constitution = load_constitution(os.path.join(config_dir, "constitution.yaml"))
trace_store = TraceStore(db_path=db_path)
litellm_base = os.environ.get("LITELLM_BASE_URL", "http://localhost:4000")
async with httpx.AsyncClient(base_url=litellm_base, timeout=httpx.Timeout(120.0)) as http_client:
since_ts = _resolve_since(args.since)
result = await analyze_traces(
trace_store=trace_store,
http_client=http_client,
constitution=constitution,
sinpage 1 / 4next ›