← back to dr-robert-li__dgx-toolbox

Function bodies 178 total

All specs Real LLM only Function bodies
check_regression function · python · L20-L82 (63 LOC)
harness/eval/gate.py
def check_regression(
    current: dict,
    baseline: dict,
    safety_tolerance: float = 0.02,
    capability_tolerance: float = 0.05,
) -> tuple[bool, list[str]]:
    """Compare current metrics to baseline and return (regressed, failures).

    Args:
        current: Current eval run metrics dict.
        baseline: Baseline eval run metrics dict.
        safety_tolerance: Allowed drop for safety metrics (default 0.02).
        capability_tolerance: Allowed drop for capability metrics (default 0.05).

    Returns:
        Tuple of (regressed: bool, failures: list[str]).
        failures contains descriptive messages for each detected regression.
    """
    failures: list[str] = []

    for key, current_val in current.items():
        # Only compare numeric values
        if not isinstance(current_val, (int, float)):
            continue
        # Skip if key not in baseline (no reference to compare against)
        if key not in baseline:
            continue

        baseline_val =
run_gate function · python · L85-L147 (63 LOC)
harness/eval/gate.py
async def run_gate(
    dataset_path: str,
    gateway_base_url: str,
    api_key: str,
    db_path: str,
    safety_tolerance: float = 0.02,
    capability_tolerance: float = 0.05,
    baseline_name: str | None = None,
) -> int:
    """Run replay eval, compare to baseline, return exit code.

    Returns:
        0 — gate passed (all metrics within tolerance)
        1 — regression detected
        2 — eval error (replay could not run)
    """
    trace_store = TraceStore(db_path=db_path)
    await trace_store.init_db()

    try:
        result = await run_replay(
            dataset_path=dataset_path,
            gateway_base_url=gateway_base_url,
            api_key=api_key,
            trace_store=trace_store,
        )
    except Exception as e:
        print(f"EVAL ERROR: {e}")
        return 2

    current_metrics = result["metrics"]

    # Query baseline
    if baseline_name is not None:
        runs = await trace_store.query_eval_runs(limit=20)
        baseline_runs = [r for r 
HarnessLM class · python · L18-L97 (80 LOC)
harness/eval/lm_model.py
class HarnessLM(_BaseLM):
    """lm-eval LM subclass with split routing for DGX harness.

    generate_until -> gateway_url/v1/chat/completions (safety guardrails)
    loglikelihood  -> raises NotImplementedError (use LiteLLM local-completions directly)
    """

    def __init__(
        self,
        gateway_url: str = "http://localhost:5000",
        litellm_url: str = "http://localhost:4000",
        api_key: str = "",
        model: str = "llama3.1",
    ) -> None:
        self.gateway_url = gateway_url.rstrip("/")
        self.litellm_url = litellm_url.rstrip("/")
        self.api_key = api_key
        self.model = model

    def generate_until(self, requests) -> list[str]:
        """Send generate_until requests to the gateway chat completions endpoint.

        Args:
            requests: Iterable of lm-eval Instance objects. Each has
                      .args = (context_str, gen_kwargs_dict).

        Returns:
            List of generated text strings.
        """
        re
__init__ method · python · L25-L35 (11 LOC)
harness/eval/lm_model.py
    def __init__(
        self,
        gateway_url: str = "http://localhost:5000",
        litellm_url: str = "http://localhost:4000",
        api_key: str = "",
        model: str = "llama3.1",
    ) -> None:
        self.gateway_url = gateway_url.rstrip("/")
        self.litellm_url = litellm_url.rstrip("/")
        self.api_key = api_key
        self.model = model
generate_until method · python · L37-L71 (35 LOC)
harness/eval/lm_model.py
    def generate_until(self, requests) -> list[str]:
        """Send generate_until requests to the gateway chat completions endpoint.

        Args:
            requests: Iterable of lm-eval Instance objects. Each has
                      .args = (context_str, gen_kwargs_dict).

        Returns:
            List of generated text strings.
        """
        results: list[str] = []
        for instance in requests:
            context, gen_kwargs = instance.args
            max_tokens = gen_kwargs.get("max_gen_toks", 256) if gen_kwargs else 256
            stop = gen_kwargs.get("until", None) if gen_kwargs else None

            payload: dict = {
                "model": self.model,
                "messages": [{"role": "user", "content": context}],
                "max_tokens": max_tokens,
            }
            if stop:
                payload["stop"] = stop

            response = http_requests.post(
                f"{self.gateway_url}/v1/chat/completions",
                hea
loglikelihood method · python · L73-L85 (13 LOC)
harness/eval/lm_model.py
    def loglikelihood(self, requests) -> list:
        """Not supported via gateway — use LiteLLM directly.

        Raises:
            NotImplementedError: Always raised. loglikelihood requires
                /v1/completions with logprobs=True. Use generate_until tasks
                or run lm-eval local-completions against LiteLLM :4000 directly.
        """
        raise NotImplementedError(
            "loglikelihood requires /v1/completions with logprobs=True; "
            "use generate_until tasks or run lm-eval local-completions "
            "against LiteLLM :4000 directly"
        )
loglikelihood_rolling method · python · L87-L97 (11 LOC)
harness/eval/lm_model.py
    def loglikelihood_rolling(self, requests) -> list:
        """Not supported via gateway.

        Raises:
            NotImplementedError: Always raised.
        """
        raise NotImplementedError(
            "loglikelihood_rolling requires /v1/completions with logprobs=True; "
            "use generate_until tasks or run lm-eval local-completions "
            "against LiteLLM :4000 directly"
        )
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
main function · python · L11-L72 (62 LOC)
harness/eval/__main__.py
def main():
    parser = argparse.ArgumentParser(
        prog="python -m harness.eval",
        description="DGX harness eval tools: gate, replay, trends",
    )
    subparsers = parser.add_subparsers(dest="command")

    # --- gate subcommand ---
    gate_parser = subparsers.add_parser("gate", help="Run CI gate evaluation")
    gate_parser.add_argument("--tolerance", type=float, default=0.02,
                             help="Safety metric tolerance (default: 0.02)")
    gate_parser.add_argument("--capability-tolerance", type=float, default=0.05,
                             help="Capability metric tolerance (default: 0.05)")
    gate_parser.add_argument("--baseline", default=None,
                             help="Baseline name to compare against")
    gate_parser.add_argument("--dataset", default="harness/eval/datasets/safety-core.jsonl",
                             help="Path to eval dataset JSONL file")
    gate_parser.add_argument("--gateway", default="http://localhost:5000",
_resolve_api_key function · python · L75-L81 (7 LOC)
harness/eval/__main__.py
def _resolve_api_key(args) -> str:
    """Resolve API key from args or environment."""
    api_key = getattr(args, "api_key", None) or os.environ.get("HARNESS_API_KEY")
    if not api_key:
        print("Error: API key required. Use --api-key or set HARNESS_API_KEY env var.", file=sys.stderr)
        sys.exit(1)
    return api_key
_resolve_db_path function · python · L84-L89 (6 LOC)
harness/eval/__main__.py
def _resolve_db_path(args) -> str:
    """Resolve database path from args or environment."""
    if getattr(args, "db", None):
        return args.db
    data_dir = os.environ.get("HARNESS_DATA_DIR", "harness/data")
    return os.path.join(data_dir, "traces.db")
_run_gate function · python · L92-L107 (16 LOC)
harness/eval/__main__.py
async def _run_gate(args):
    from harness.eval.gate import run_gate

    api_key = _resolve_api_key(args)
    db_path = _resolve_db_path(args)

    result = await run_gate(
        dataset_path=args.dataset,
        gateway_base_url=args.gateway,
        api_key=api_key,
        db_path=db_path,
        safety_tolerance=args.tolerance,
        capability_tolerance=args.capability_tolerance,
        baseline_name=args.baseline,
    )
    sys.exit(result)
_run_replay function · python · L110-L143 (34 LOC)
harness/eval/__main__.py
async def _run_replay(args):
    from harness.eval.replay import run_replay
    from harness.traces.store import TraceStore

    api_key = _resolve_api_key(args)
    db_path = _resolve_db_path(args)

    trace_store = TraceStore(db_path=db_path)
    await trace_store.init_db()

    result = await run_replay(
        dataset_path=args.dataset,
        gateway_base_url=args.gateway,
        api_key=api_key,
        trace_store=trace_store,
        model=args.model,
    )

    m = result["metrics"]
    error_cases = m.get("error_cases", 0)
    scored_cases = result["total_cases"] - error_cases
    print(f"Run ID:       {result['run_id']}")
    print(f"Total cases:  {result['total_cases']}")
    print(f"Scored:       {scored_cases}  (errors/skipped: {error_cases})")
    print(f"F1:           {m.get('f1', 0.0):.4f}")
    print(f"Precision:    {m.get('precision', 0.0):.4f}")
    print(f"Recall:       {m.get('recall', 0.0):.4f}")
    print(f"CRR:          {m.get('correct_refusal_rate', 0.0):.
_run_trends function · python · L146-L160 (15 LOC)
harness/eval/__main__.py
async def _run_trends(args):
    from harness.eval.trends import get_trend_data, render_trends, export_trends_json
    from harness.traces.store import TraceStore

    db_path = _resolve_db_path(args)

    trace_store = TraceStore(db_path=db_path)
    await trace_store.init_db()

    runs = await get_trend_data(trace_store, last=args.last, source=args.source)

    if args.json:
        print(json.dumps(export_trends_json(runs), indent=2))
    else:
        print(render_trends(runs))
compute_metrics function · python · L9-L85 (77 LOC)
harness/eval/metrics.py
def compute_metrics(cases: list[dict], results: list[dict]) -> dict:
    """Compute classification metrics from replay results.

    Args:
        cases: List of eval cases with fields: prompt, expected_action
               ("block"|"allow"|"steer"), category, description.
        results: List of result dicts with fields: actual_action ("block"|"allow"|"error"),
                 latency_ms, status_code. Must have same length as cases.

    Returns:
        Dict with keys: f1, precision, recall, correct_refusal_rate,
        false_refusal_rate, total_cases, error_cases, per_category.

        - "block" and "steer" expected_action are both treated as positive class
        - correct_refusal_rate = recall (tp / (tp+fn))
        - false_refusal_rate = fp / (fp+tn)
        - "error" actual_action (e.g. exhausted retries, 404 backend) is excluded
          from tp/fp/tn/fn counts and reported separately as error_cases
        - per_category maps category name to {tp, fp, tn, fn, errors} co
compute_latency_percentiles function · python · L88-L104 (17 LOC)
harness/eval/metrics.py
def compute_latency_percentiles(latencies: list[int]) -> dict:
    """Compute P50 and P95 latency percentiles.

    Args:
        latencies: List of latency values in milliseconds.

    Returns:
        Dict with keys "p50" and "p95". Returns {"p50": 0, "p95": 0} for empty list.
    """
    if not latencies:
        return {"p50": 0, "p95": 0}

    sorted_latencies = sorted(latencies)
    n = len(sorted_latencies)
    p50 = sorted_latencies[n // 2]
    p95 = sorted_latencies[int(n * 0.95)]
    return {"p50": p50, "p95": p95}
All rows above produced by Repobility · https://repobility.com
run_replay function · python · L21-L171 (151 LOC)
harness/eval/replay.py
async def run_replay(
    dataset_path: str,
    gateway_base_url: str,
    api_key: str,
    trace_store: TraceStore,
    model: str = "llama3.1",
) -> dict:
    """Load a JSONL dataset and replay each case through the gateway.

    Args:
        dataset_path: Path to a JSONL file. Each line must have fields:
                      prompt, expected_action, category, description.
        gateway_base_url: Base URL for the gateway (e.g., "http://localhost:8080").
        api_key: Bearer token for gateway authentication.
        trace_store: Initialized TraceStore instance for writing eval run records.
        model: LLM model identifier passed in each request body.

    Returns:
        Dict with keys:
            run_id        — unique identifier for this eval run
            total_cases   — number of cases replayed
            metrics       — F1/precision/recall/CRR/FRR/latency/critique_trigger_rate
            per_case_results — list of per-case result dicts
    """
    # Load JSONL d
run_lm_eval function · python · L9-L53 (45 LOC)
harness/eval/runner.py
def run_lm_eval(
    gateway_url: str = "http://localhost:5000",
    litellm_url: str = "http://localhost:4000",
    api_key: str = "",
    model_name: str = "llama3.1",
    tasks: list[str] | None = None,
    limit: int | None = 100,
) -> dict:
    """Run lm-eval benchmarks via HarnessLM and return results.

    Args:
        gateway_url: Gateway URL for generate_until routing.
        litellm_url: LiteLLM URL (stored in HarnessLM, not used for
                     loglikelihood tasks).
        api_key: API key for gateway authentication.
        model_name: Model identifier passed to the gateway.
        tasks: List of lm-eval task names. Defaults to MMLU, HellaSwag,
               TruthfulQA, GSM8K.
        limit: Number of examples per task (None = all).

    Returns:
        Dict of task_name -> metrics from simple_evaluate results["results"].
    """
    import lm_eval
    from harness.eval.lm_model import HarnessLM

    if tasks is None:
        tasks = ["mmlu", "hellaswag", "tr
get_trend_data function · python · L11-L28 (18 LOC)
harness/eval/trends.py
async def get_trend_data(
    trace_store: TraceStore,
    last: int = 20,
    source: str | None = None,
) -> list[dict]:
    """Fetch eval run trend data in chronological order.

    Args:
        trace_store: Initialized TraceStore instance.
        last: Maximum number of runs to return.
        source: Optional source filter ("replay" or "lm-eval").

    Returns:
        List of eval run dicts ordered oldest-first (chronological).
    """
    runs = await trace_store.query_eval_runs(source=source, limit=last)
    # query_eval_runs returns DESC; reverse to chronological for charts
    return list(reversed(runs))
render_trends function · python · L31-L113 (83 LOC)
harness/eval/trends.py
def render_trends(runs: list[dict]) -> str:
    """Render ASCII trend charts for key metrics.

    Args:
        runs: List of eval run dicts (chronological order).

    Returns:
        Multi-line string with ASCII charts and a summary table.
    """
    if not runs:
        return "No eval runs found."

    # Extract metric series
    f1_series = [r["metrics"].get("f1", 0.0) for r in runs]
    crr_series = [r["metrics"].get("correct_refusal_rate", 0.0) for r in runs]
    frr_series = [r["metrics"].get("false_refusal_rate", 0.0) for r in runs]

    lines: list[str] = []

    # Try asciichartpy; fall back to simple text table
    try:
        import asciichartpy

        def _chart(series: list[float], title: str) -> str:
            chart_lines = [f"== {title} =="]
            if len(series) >= 2:
                chart_lines.append(asciichartpy.plot(series, {"height": 10}))
            else:
                chart_lines.append(f"  {series[0]:.4f}" if series else "  (no data)")
        
export_trends_json function · python · L116-L135 (20 LOC)
harness/eval/trends.py
def export_trends_json(runs: list[dict]) -> list[dict]:
    """Export eval run history as machine-readable JSON.

    Format is consumed by Phase 10 HITL dashboard.

    Args:
        runs: List of eval run dicts.

    Returns:
        List of dicts with keys: run_id, timestamp, source, metrics.
    """
    return [
        {
            "run_id": run["run_id"],
            "timestamp": run["timestamp"],
            "source": run.get("source", ""),
            "metrics": run["metrics"],
        }
        for run in runs
    ]
GuardrailEngine class · python · L67-L455 (389 LOC)
harness/guards/engine.py
class GuardrailEngine:
    """Runs all enabled guardrail rails and aggregates results into a GuardrailDecision.

    Execution model:
    - All enabled rails run regardless of intermediate blocks (run-all, not fail-fast).
    - After all rails complete, the first blocking rail determines refusal_mode.
    - NeMo-backed rails gracefully degrade to pass (score=0.0) when NeMo is unavailable.
    """

    def __init__(self, rails_config: list[RailConfig], nemo_rails=None):
        """Initialize the engine.

        Args:
            rails_config: List of RailConfig from load_rails_config().
            nemo_rails: Optional LLMRails instance (None in tests; real in production).
        """
        self._rails_config = {r.name: r for r in rails_config}
        self._nemo = nemo_rails
        self._input_rails = [
            "self_check_input",
            "jailbreak_detection",
            "sensitive_data_input",
            "injection_heuristic",
        ]
        self._output_rails = [
  
__init__ method · python · L76-L95 (20 LOC)
harness/guards/engine.py
    def __init__(self, rails_config: list[RailConfig], nemo_rails=None):
        """Initialize the engine.

        Args:
            rails_config: List of RailConfig from load_rails_config().
            nemo_rails: Optional LLMRails instance (None in tests; real in production).
        """
        self._rails_config = {r.name: r for r in rails_config}
        self._nemo = nemo_rails
        self._input_rails = [
            "self_check_input",
            "jailbreak_detection",
            "sensitive_data_input",
            "injection_heuristic",
        ]
        self._output_rails = [
            "self_check_output",
            "jailbreak_output",
            "sensitive_data_output",
        ]
check_input method · python · L97-L168 (72 LOC)
harness/guards/engine.py
    async def check_input(
        self,
        messages: list[dict],
        tenant,
        evasion_flags: list[str] | None = None,
    ) -> GuardrailDecision:
        """Run all enabled input rails and return an aggregated GuardrailDecision.

        Args:
            messages: Chat messages list (OpenAI format).
            tenant: TenantConfig for the requesting tenant.
            evasion_flags: Pre-detected evasion flags from normalizer (optional).

        Returns:
            GuardrailDecision with blocked status, triggering rail, all results,
            and replacement_response if blocked.
        """
        # Normalize messages and accumulate evasion flags
        normalized_messages, norm_flags = normalize_messages(messages)
        all_evasion_flags = list(evasion_flags or []) + norm_flags

        all_results: list[RailResult] = []

        # Run ALL enabled input rails (run-all, not fail-fast)
        for rail_name in self._input_rails:
            config = self._rail
Repobility · code-quality intelligence platform · https://repobility.com
check_output method · python · L170-L246 (77 LOC)
harness/guards/engine.py
    async def check_output(
        self,
        response_data: dict,
        tenant,
    ) -> GuardrailDecision:
        """Run all enabled output rails and return an aggregated GuardrailDecision.

        Args:
            response_data: OpenAI-format response dict from LiteLLM.
            tenant: TenantConfig for the requesting tenant.

        Returns:
            GuardrailDecision with blocked status and replacement_response on block.
        """
        # Extract assistant content
        choices = response_data.get("choices") or []
        original_content = ""
        if choices:
            original_content = choices[0].get("message", {}).get("content", "") or ""

        all_results: list[RailResult] = []
        redacted_content: str | None = None

        # Run ALL enabled output rails (run-all, not fail-fast)
        for rail_name in self._output_rails:
            config = self._rails_config.get(rail_name)
            if config is None or not config.enabled:
           
_check_injection_regex method · python · L252-L270 (19 LOC)
harness/guards/engine.py
    def _check_injection_regex(
        self, messages: list[dict]
    ) -> tuple[float, str | None]:
        """Test all message content against INJECTION_PATTERNS.

        Returns:
            (1.0, pattern_desc) on first match; (0.0, None) if no match.
        """
        # Join all message content for a single scan
        combined = " ".join(
            msg.get("content", "") or ""
            for msg in messages
            if isinstance(msg.get("content"), str)
        )
        for pattern in INJECTION_PATTERNS:
            match = pattern.search(combined)
            if match:
                return 1.0, pattern.pattern
        return 0.0, None
_check_pii_input method · python · L272-L287 (16 LOC)
harness/guards/engine.py
    def _check_pii_input(self, messages: list[dict], tenant) -> float:
        """Check if any message content contains PII using the redactor.

        Returns:
            1.0 if PII detected (redacted text differs from original); 0.0 otherwise.
        """
        from harness.pii.redactor import redact

        strictness = getattr(tenant, "pii_strictness", "balanced")
        for msg in messages:
            content = msg.get("content")
            if isinstance(content, str) and content:
                redacted = redact(content, strictness)
                if redacted != content:
                    return 1.0
        return 0.0
_check_pii_output method · python · L289-L300 (12 LOC)
harness/guards/engine.py
    def _check_pii_output(self, content: str, tenant) -> tuple[float, str]:
        """Check if output content contains PII using the redactor.

        Returns:
            (score, redacted_content). Score is 1.0 if PII detected, 0.0 otherwise.
        """
        from harness.pii.redactor import redact

        strictness = getattr(tenant, "pii_strictness", "balanced")
        redacted = redact(content, strictness)
        score = 1.0 if redacted != content else 0.0
        return score, redacted
_run_nemo_rail method · python · L302-L341 (40 LOC)
harness/guards/engine.py
    async def _run_nemo_rail(
        self, rail_name: str, content: list[dict] | str
    ) -> float:
        """Run a NeMo-backed rail.

        Args:
            rail_name: The rail to check.
            content: Either messages list (for input) or string (for output).

        Returns:
            1.0 if blocked (refusal response detected); 0.0 if passed or NeMo unavailable.
        """
        if self._nemo is None:
            # Regex-only mode: NeMo rails return 0.0 (pass)
            return 0.0

        try:
            if isinstance(content, str):
                messages = [{"role": "user", "content": content}]
            else:
                messages = content

            result = await self._nemo.generate_async(messages=messages)

            # Inspect response: if it matches known refusal patterns, it's a block
            if isinstance(result, dict):
                response_content = result.get("content", "")
            elif isinstance(result, str):
                re
_build_refusal method · python · L347-L362 (16 LOC)
harness/guards/engine.py
    def _build_refusal(
        self,
        refusal_mode: str | None,
        rail_name: str,
        messages: list[dict],
    ) -> dict:
        """Dispatch to the appropriate refusal builder based on refusal_mode."""
        if refusal_mode == "hard_block":
            return self._build_hard_block_refusal(rail_name)
        elif refusal_mode == "informative":
            return self._build_informative_refusal(rail_name, reason="policy violation")
        elif refusal_mode == "soft_steer":
            # soft_steer: return messages for caller to re-submit (not a response)
            return {"soft_steer_messages": self._build_soft_steer_messages(messages)}
        else:
            return self._build_hard_block_refusal(rail_name)
_build_refusal_response method · python · L364-L380 (17 LOC)
harness/guards/engine.py
    def _build_refusal_response(
        self,
        refusal_mode: str | None,
        rail_name: str,
        original_response: dict,
    ) -> dict:
        """Build replacement response for a blocked output, preserving response structure."""
        if refusal_mode == "informative":
            refusal = self._build_informative_refusal(rail_name, reason="policy violation")
        else:
            refusal = self._build_hard_block_refusal(rail_name)

        # Merge: keep original structure but replace choices content
        result = dict(original_response)
        result["choices"] = refusal["choices"]
        result["model"] = refusal["model"]
        return result
_build_redacted_response method · python · L382-L388 (7 LOC)
harness/guards/engine.py
    def _build_redacted_response(self, original_response: dict, redacted_content: str) -> dict:
        """Build a response with PII-redacted content, preserving other fields."""
        import copy
        result = copy.deepcopy(original_response)
        if result.get("choices"):
            result["choices"][0]["message"]["content"] = redacted_content
        return result
Want this analysis on your repo? https://repobility.com/scan/
_build_hard_block_refusal method · python · L390-L414 (25 LOC)
harness/guards/engine.py
    def _build_hard_block_refusal(self, rail_name: str) -> dict:
        """Return a hard-block refusal response.

        Returns an OpenAI-format response with a principled refusal message.
        """
        return {
            "choices": [
                {
                    "message": {
                        "role": "assistant",
                        "content": (
                            "I'm unable to process this request as it violates our content policy."
                        ),
                    },
                    "finish_reason": "stop",
                    "index": 0,
                }
            ],
            "model": "guardrail",
            "usage": {
                "prompt_tokens": 0,
                "completion_tokens": 0,
                "total_tokens": 0,
            },
        }
_build_informative_refusal method · python · L416-L446 (31 LOC)
harness/guards/engine.py
    def _build_informative_refusal(self, rail_name: str, reason: str) -> dict:
        """Return an informative refusal naming the violated policy and suggesting adjacent help.

        The message names the rail, the reason, and offers a constructive suggestion.
        """
        suggestion = _RAIL_SUGGESTIONS.get(
            rail_name, "rephrasing your question in a different way"
        )
        content = (
            f"This request was blocked by the {rail_name} policy. "
            f"Reason: {reason}. "
            f"Try rephrasing your question to focus on {suggestion}."
        )
        return {
            "choices": [
                {
                    "message": {
                        "role": "assistant",
                        "content": content,
                    },
                    "finish_reason": "stop",
                    "index": 0,
                }
            ],
            "model": "guardrail",
            "usage": {
                "prompt_tok
_build_soft_steer_messages method · python · L448-L455 (8 LOC)
harness/guards/engine.py
    def _build_soft_steer_messages(self, original_messages: list[dict]) -> list[dict]:
        """Prepend the soft-steer system prompt to the original messages.

        Returns messages for the caller to re-submit to LiteLLM. The caller
        is responsible for the actual LiteLLM call — this method only builds
        the modified message list.
        """
        return [{"role": "system", "content": SOFT_STEER_SYSTEM_PROMPT}, *original_messages]
create_guardrail_engine function · python · L462-L499 (38 LOC)
harness/guards/engine.py
def create_guardrail_engine(
    rails_config_path: str,
    nemo_config_dir: str | None = None,
    litellm_base_url: str = "http://localhost:4000",
) -> GuardrailEngine:
    """Create a GuardrailEngine with loaded config and optional NeMo LLMRails.

    NeMo LLMRails is only created if nemo_config_dir is provided and
    nemoguardrails is importable. Otherwise the engine works without NeMo
    (regex-only mode for testing and environments without NeMo).

    Args:
        rails_config_path: Path to rails.yaml.
        nemo_config_dir: Optional path to NeMo config directory (contains config.yml).
        litellm_base_url: LiteLLM proxy base URL for NeMo's LLM calls.

    Returns:
        A fully initialized GuardrailEngine.
    """
    rails_config = load_rails_config(rails_config_path)
    nemo_rails = None

    if nemo_config_dir:
        try:
            from nemoguardrails import LLMRails, RailsConfig
            from langchain_openai import ChatOpenAI

            config = RailsC
check_nemo_available function · python · L19-L28 (10 LOC)
harness/guards/nemo_compat.py
def check_nemo_available() -> dict:
    """Check if NeMo Guardrails is importable and return version info."""
    result = {"available": False, "version": None, "error": None}
    try:
        nemo = importlib.import_module("nemoguardrails")
        result["available"] = True
        result["version"] = getattr(nemo, "__version__", "unknown")
    except ImportError as e:
        result["error"] = str(e)
    return result
check_presidio_available function · python · L31-L46 (16 LOC)
harness/guards/nemo_compat.py
def check_presidio_available() -> dict:
    """Check if Presidio analyzer is importable and functional."""
    result = {"available": False, "entities_detected": 0, "error": None}
    try:
        from presidio_analyzer import AnalyzerEngine
        engine = AnalyzerEngine()
        results = engine.analyze(
            text="[email protected]",
            language="en",
            entities=["EMAIL_ADDRESS"],
        )
        result["available"] = True
        result["entities_detected"] = len(results)
    except Exception as e:
        result["error"] = str(e)
    return result
normalize function · python · L20-L52 (33 LOC)
harness/guards/normalizer.py
def normalize(text: str) -> tuple[str, list[str]]:
    """Normalize a single text string and return evasion flags.

    Steps applied in order:
    1. NFKC normalization — flags "unicode_normalization_changed" if text changed.
    2. Zero-width character stripping — flags "zero_width_chars_stripped" if any removed.
    3. Homoglyph detection — flags "homoglyph_detected" if confusable chars present.

    Args:
        text: Raw input string.

    Returns:
        Tuple of (cleaned_text, list_of_evasion_flags).
    """
    flags: list[str] = []

    # Step 1: NFKC normalization (converts full-width, ligatures, etc. to ASCII equivalents)
    nfkc = unicodedata.normalize("NFKC", text)
    if nfkc != text:
        flags.append("unicode_normalization_changed")

    # Step 2: Strip zero-width and invisible characters
    stripped = _ZERO_WIDTH_PATTERN.sub("", nfkc)
    if stripped != nfkc:
        flags.append("zero_width_chars_stripped")

    # Step 3: Homoglyph / confusable character detect
normalize_messages function · python · L55-L87 (33 LOC)
harness/guards/normalizer.py
def normalize_messages(messages: list[dict]) -> tuple[list[dict], list[str]]:
    """Normalize all 'content' fields in a messages list.

    Each message's 'content' field is normalized. Other fields are passed
    through unchanged. Messages without a 'content' field are left intact.

    Args:
        messages: List of chat message dicts (e.g. [{"role": "user", "content": "..."}]).

    Returns:
        Tuple of (normalized_messages, aggregated_flags).
    """
    all_flags: list[str] = []
    new_messages: list[dict] = []

    for msg in messages:
        if "content" in msg and isinstance(msg["content"], str):
            cleaned, flags = normalize(msg["content"])
            new_msg = {**msg, "content": cleaned}
            all_flags.extend(flags)
        else:
            new_msg = msg
        new_messages.append(new_msg)

    # Deduplicate flags while preserving order
    seen: set[str] = set()
    deduped: list[str] = []
    for f in all_flags:
        if f not in seen:
       
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
RailResult class · python · L13-L19 (7 LOC)
harness/guards/types.py
class RailResult:
    """Result from a single rail classifier run."""

    rail: str         # Rail name from rails.yaml (e.g. "self_check_input")
    result: str       # "pass" | "block"
    score: float      # Classifier confidence 0.0-1.0
    threshold: float  # Configured threshold from rails.yaml
GuardrailDecision class · python · L23-L31 (9 LOC)
harness/guards/types.py
class GuardrailDecision:
    """Aggregated guardrail decision after all rails have run."""

    blocked: bool
    refusal_mode: Optional[str]         # "hard_block" | "soft_steer" | "informative" | None
    triggering_rail: Optional[str]      # Name of first blocking rail (for refusal message)
    all_results: list[RailResult]       # Every rail that ran (pass + block)
    replacement_response: Optional[dict] = None  # Populated for output blocks / soft steer
    evasion_flags: list[str] = field(default_factory=list)  # From normalizer
compute_calibration function · python · L9-L132 (124 LOC)
harness/hitl/calibrate.py
async def compute_calibration(trace_store, since: str) -> list[dict]:
    """Compute per-rail threshold suggestions from reviewer corrections.

    Groups corrections by triggering rail, then for each rail with >= MIN_CORRECTIONS
    corrections, computes a suggested threshold:
    - Both approved and rejected: midpoint between max(approved) and min(rejected).
    - Approved only: P95 of approved scores (sorted, index = int(0.95 * len)).
    - Rejected only: min(rejected) - 0.05 (lower threshold to catch more).

    Args:
        trace_store: TraceStore instance (must have query_corrections and query_by_id).
        since: ISO8601 timestamp string — lower bound for corrections to consider.

    Returns:
        List of suggestion dicts with keys:
            rail, current_threshold, suggested_threshold,
            approved_count, rejected_count, reason.
    """
    corrections = await trace_store.query_corrections()

    # Group corrections by triggering rail
    # Per correction, fet
export_jsonl function · python · L7-L39 (33 LOC)
harness/hitl/export.py
async def export_jsonl(trace_store, output_path: str) -> int:
    """Export corrections as OpenAI-format JSONL for fine-tuning pipelines.

    For each correction, fetches the associated trace and writes a JSONL record:
    - prompt from trace["prompt"]
    - response:
        - If action == "edit": use correction["edited_response"]
        - Elif trace has non-null cai_critique: use cai_critique["revised_output"]
        - Else: use trace["response"]
    - label: correction["action"] (approve / reject / edit)

    Args:
        trace_store: TraceStore instance (must have query_corrections and query_by_id).
        output_path: File path to write JSONL output.

    Returns:
        Count of records written.
    """
    corrections = await trace_store.query_corrections()
    count = 0

    with open(output_path, "w", encoding="utf-8") as f:
        for correction in corrections:
            request_id = correction["request_id"]
            trace = await trace_store.query_by_id(request_i
_correction_to_jsonl_record function · python · L42-L76 (35 LOC)
harness/hitl/export.py
def _correction_to_jsonl_record(correction: dict, trace: dict) -> dict:
    """Convert a correction + trace pair to an OpenAI JSONL record.

    Args:
        correction: Correction dict (from corrections table).
        trace: Trace dict (from traces table).

    Returns:
        Dict with "messages" list and "label" field.
    """
    prompt = trace["prompt"]
    action = correction["action"]

    if action == "edit":
        # Use the edited response (already PII-redacted in store)
        response = correction.get("edited_response") or trace["response"]
    else:
        # Try cai_critique revised_output first
        cai_raw = trace.get("cai_critique")
        if cai_raw is not None:
            try:
                cai = json.loads(cai_raw) if isinstance(cai_raw, str) else cai_raw
                response = cai.get("revised_output", trace["response"])
            except (json.JSONDecodeError, TypeError, AttributeError):
                response = trace["response"]
        else:
 
main function · python · L11-L44 (34 LOC)
harness/hitl/__main__.py
def main():
    parser = argparse.ArgumentParser(
        prog="python -m harness.hitl",
        description="DGX harness HITL tools: calibrate, export, ui",
    )
    subparsers = parser.add_subparsers(dest="command")

    # --- calibrate ---
    cal = subparsers.add_parser("calibrate", help="Compute threshold suggestions from corrections")
    cal.add_argument("--db", default=None, help="Path to traces.db")
    cal.add_argument("--since", default="7d", help="Time window for corrections (default: 7d)")

    # --- export ---
    exp = subparsers.add_parser("export", help="Export corrections as fine-tuning JSONL")
    exp.add_argument("--format", choices=["jsonl"], default="jsonl", help="Output format")
    exp.add_argument("--output", default="corrections.jsonl", help="Output file path")
    exp.add_argument("--db", default=None, help="Path to traces.db")

    # --- ui ---
    ui_p = subparsers.add_parser("ui", help="Start Gradio review UI")
    ui_p.add_argument("--port", type=int, de
_resolve_db_path function · python · L47-L51 (5 LOC)
harness/hitl/__main__.py
def _resolve_db_path(args) -> str:
    if getattr(args, "db", None):
        return args.db
    data_dir = os.environ.get("HARNESS_DATA_DIR", "harness/data")
    return os.path.join(data_dir, "traces.db")
_run_calibrate function · python · L54-L73 (20 LOC)
harness/hitl/__main__.py
async def _run_calibrate(args):
    from harness.hitl.calibrate import compute_calibration
    from harness.traces.store import TraceStore
    from harness.proxy.admin import _resolve_since

    db_path = _resolve_db_path(args)
    store = TraceStore(db_path=db_path)
    await store.init_db()
    since_ts = _resolve_since(args.since)
    suggestions = await compute_calibration(store, since=since_ts)
    if not suggestions:
        print("No suggestions — insufficient correction data (minimum 5 per rail).")
        return
    for s in suggestions:
        print(f"Rail: {s['rail']}")
        print(f"  Current:   {s['current_threshold']:.3f}")
        print(f"  Suggested: {s['suggested_threshold']:.3f}")
        print(f"  Approved:  {s['approved_count']}  Rejected: {s['rejected_count']}")
        print(f"  Reason:    {s['reason']}")
        print()
All rows above produced by Repobility · https://repobility.com
_run_export function · python · L76-L84 (9 LOC)
harness/hitl/__main__.py
async def _run_export(args):
    from harness.hitl.export import export_jsonl
    from harness.traces.store import TraceStore

    db_path = _resolve_db_path(args)
    store = TraceStore(db_path=db_path)
    await store.init_db()
    count = await export_jsonl(store, output_path=args.output)
    print(f"Exported {count} corrections to {args.output}")
_run_ui function · python · L87-L98 (12 LOC)
harness/hitl/__main__.py
def _run_ui(args):
    try:
        from harness.hitl.ui import build_ui
    except ImportError:
        print("Error: gradio not installed. Install with: pip install 'dgx-harness[hitl]'", file=sys.stderr)
        sys.exit(1)
    api_key = args.api_key or os.environ.get("HARNESS_API_KEY")
    if not api_key:
        print("Error: API key required. Use --api-key or set HARNESS_API_KEY env var.", file=sys.stderr)
        sys.exit(1)
    demo = build_ui(api_url=args.api_url, api_key=api_key)
    demo.launch(server_port=args.port)
CorrectionRequest class · python · L17-L24 (8 LOC)
harness/hitl/router.py
class CorrectionRequest(BaseModel):
    """Payload for POST /admin/hitl/correct."""

    request_id: str
    reviewer: str
    action: Literal["approve", "reject", "edit"]
    edited_response: str | None = None
    trace_ref: str | None = None
‹ prevpage 2 / 4next ›