← back to dr-robert-li__dgx-toolbox

Function bodies 178 total

All specs Real LLM only Function bodies
get_hitl_queue function · python · L28-L55 (28 LOC)
harness/hitl/router.py
async def get_hitl_queue(
    request: Request,
    tenant: TenantConfig = Depends(verify_api_key),
    rail: str = Query(default="all"),
    tenant_filter: str = Query(default="all", alias="tenant"),
    since: str = Query(default="24h"),
    hide_reviewed: bool = Query(default=False),
) -> JSONResponse:
    """Return the HITL review queue, priority-sorted.

    Items are sorted by review urgency: closest-to-threshold flagged traces
    appear first; already-reviewed items sort to the bottom.

    Query parameters:
        rail: Filter by triggering rail name, or 'all' (default).
        tenant: Filter by tenant ID, or 'all' (default).
        since: Time window — ISO8601 timestamp or shorthand like '24h', '7d'.
        hide_reviewed: If true, exclude traces that have corrections.
    """
    since_ts = _resolve_since(since)
    trace_store = request.app.state.trace_store
    results = await trace_store.query_hitl_queue(
        since=since_ts,
        rail_filter=rail,
        tenant
submit_correction function · python · L59-L72 (14 LOC)
harness/hitl/router.py
async def submit_correction(
    request: Request,
    body: CorrectionRequest,
    tenant: TenantConfig = Depends(verify_api_key),
) -> JSONResponse:
    """Submit a human correction for a flagged trace.

    Action must be one of: approve, reject, edit.
    When action is 'edit', edited_response should contain the corrected text
    (PII will be redacted before storage).
    """
    trace_store = request.app.state.trace_store
    await trace_store.write_correction(body.model_dump())
    return JSONResponse(content={"status": "ok", "request_id": body.request_id})
_extract_triggering_rail_inline function · python · L15-L35 (21 LOC)
harness/hitl/ui.py
def _extract_triggering_rail_inline(guardrail_decisions: dict) -> str | None:
    """Return the rail name from the all_results entry closest to threshold.

    Mirrors harness.traces.store._extract_triggering_rail for inline use
    without importing the async store module.
    """
    if isinstance(guardrail_decisions, list):
        all_results = guardrail_decisions
    else:
        all_results = guardrail_decisions.get("all_results", [])
    best: str | None = None
    best_distance = float("inf")
    for result in all_results:
        score = result.get("score", 0)
        threshold = result.get("threshold", 1.0)
        if score > 0:
            distance = threshold - score
            if distance < best_distance:
                best_distance = distance
                best = result.get("rail_name") or result.get("rail")
    return best
_action_taken function · python · L38-L62 (25 LOC)
harness/hitl/ui.py
def _action_taken(item: dict) -> str:
    """Derive human-readable action from guardrail_decisions dict."""
    gd = item.get("guardrail_decisions") or {}
    if isinstance(gd, str):
        try:
            gd = json.loads(gd)
        except (json.JSONDecodeError, TypeError):
            gd = {}
    status_code = item.get("status_code", 200)
    refusal_event = item.get("refusal_event", 0)
    # guardrail_decisions may be a list (from trace JSON) or a dict with 'all_results'
    if isinstance(gd, list):
        all_results = gd
    else:
        all_results = gd.get("all_results", [])
    # Any result with score > threshold means it was blocked/steered
    has_cai = item.get("cai_critique") is not None
    if refusal_event:
        return "blocked"
    if has_cai:
        return "critiqued"
    if status_code in (200, None):
        if any(r.get("score", 0) > r.get("threshold", 1.0) for r in all_results):
            return "blocked"
    return "allowed"
build_ui function · python · L65-L414 (350 LOC)
harness/hitl/ui.py
def build_ui(api_url: str, api_key: str):  # -> gr.Blocks
    """Build and return a Gradio Blocks dashboard for HITL review.

    Args:
        api_url: Base URL of the harness API (e.g. "http://localhost:8080").
        api_key: Bearer token for the harness API.

    Returns:
        gr.Blocks instance (not yet launched — caller calls .launch()).
    """
    import gradio as gr  # imported here so module loads without gradio installed

    client = httpx.Client(
        base_url=api_url,
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=30.0,
    )

    # -----------------------------------------------------------------------
    # Callback implementations
    # -----------------------------------------------------------------------

    def refresh_queue(rail: str, tenant: str, time_range: str, hide_rev: bool) -> list[list[Any]]:
        """Fetch queue from API and return formatted dataframe rows."""
        try:
            resp = client.get(
                "/
lifespan function · python · L23-L79 (57 LOC)
harness/main.py
async def lifespan(app: FastAPI):
    """App lifespan: load tenants, create HTTP client pool, initialize rate limiter and trace store."""
    # Load tenant config
    tenants_path = os.path.join(_CONFIG_DIR, "tenants.yaml")
    app.state.tenants = load_tenants(tenants_path)

    # Shared async HTTP client for proxying to LiteLLM
    app.state.http_client = httpx.AsyncClient(
        base_url=_LITELLM_BASE,
        timeout=httpx.Timeout(120.0),
        limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
    )

    # In-memory rate limiter (per-tenant RPM + TPM)
    app.state.rate_limiter = SlidingWindowLimiter()

    # Trace store — initialize SQLite schema (WAL mode, indexes)
    os.makedirs(_DATA_DIR, exist_ok=True)
    db_path = os.path.join(_DATA_DIR, "traces.db")
    app.state.trace_store = TraceStore(db_path=db_path)
    await app.state.trace_store.init_db()

    # Eagerly import PII redactor so AnalyzerEngine loads at startup
    import harness.pii.redactor  # 
probe function · python · L104-L106 (3 LOC)
harness/main.py
async def probe(tenant: Annotated[TenantConfig, Depends(verify_api_key)]):
    """Probe endpoint used in tests to verify auth resolves to the correct tenant."""
    return {"tenant_id": tenant.tenant_id, "bypass": tenant.bypass}
Want this analysis on your repo? https://repobility.com/scan/
_regex_redact function · python · L98-L102 (5 LOC)
harness/pii/redactor.py
def _regex_redact(text: str) -> str:
    """Apply regex patterns for structured PII replacement."""
    for pattern, replacement in _REGEX_PATTERNS:
        text = pattern.sub(replacement, text)
    return text
redact function · python · L105-L132 (28 LOC)
harness/pii/redactor.py
def redact(text: str, strictness: str = "balanced") -> str:
    """Redact PII from text using a regex pre-pass and Presidio NER.

    Args:
        text: Input text, potentially containing PII.
        strictness: One of "strict", "balanced", or "minimal".
                    Controls which Presidio entity types are detected.

    Returns:
        Text with PII replaced by typed tokens such as [EMAIL], [PHONE],
        [SSN], [CREDIT_CARD], [NAME], [ADDRESS], [REDACTED].
    """
    # Step 1: Regex pre-pass — catches structured PII regardless of spaCy model
    text = _regex_redact(text)

    # Step 2: Presidio NER pass — catches unstructured PII (names, addresses, etc.)
    entities = STRICTNESS_ENTITIES.get(strictness, STRICTNESS_ENTITIES["balanced"])
    results = _analyzer.analyze(text=text, entities=entities, language="en")

    if not results:
        return text

    anonymized = _anonymizer.anonymize(
        text=text,
        analyzer_results=results,
        operators=_OPERA
suggest_tuning function · python · L17-L49 (33 LOC)
harness/proxy/admin.py
async def suggest_tuning(
    request: Request,
    tenant: TenantConfig = Depends(verify_api_key),
    since: str = Query(default="24h", description="Time window: ISO8601 timestamp or shorthand like '24h', '7d'"),
):
    """Trigger on-demand tuning analysis based on trace history.

    Returns ranked threshold + principle tuning suggestions as both
    human-readable report and machine-readable YAML diffs.
    """
    from harness.critique.analyzer import analyze_traces

    # Resolve shorthand time strings
    since_ts = _resolve_since(since)

    trace_store = request.app.state.trace_store
    http_client = request.app.state.http_client
    critique_engine = getattr(request.app.state, "critique_engine", None)
    constitution = critique_engine.constitution if critique_engine else None

    if constitution is None:
        return JSONResponse(
            content={"error": "Constitutional AI not configured"},
            status_code=503,
        )

    result = await analyze_traces(
_resolve_since function · python · L52-L62 (11 LOC)
harness/proxy/admin.py
def _resolve_since(since: str) -> str:
    """Convert shorthand like '24h', '7d' to ISO8601 timestamp."""
    now = datetime.now(timezone.utc)
    if since.endswith("h"):
        hours = int(since[:-1])
        return (now - timedelta(hours=hours)).isoformat()
    elif since.endswith("d"):
        days = int(since[:-1])
        return (now - timedelta(days=days)).isoformat()
    else:
        return since  # Assume ISO8601 already
chat_completions function · python · L28-L233 (206 LOC)
harness/proxy/litellm.py
async def chat_completions(
    request: Request,
    tenant: TenantConfig = Depends(verify_api_key),
) -> JSONResponse:
    """Proxy /v1/chat/completions to LiteLLM with auth, rate limiting, guardrails, and tracing.

    Pipeline:
    1. RPM check (pre-request)
    2. TPM check (checks previous request's accumulated tokens)
    3. Read request body
    4. Guardrail pipeline (skip for bypass tenants):
       a. Unicode normalize
       b. Input rails
       c. If blocked: return refusal or soft-steer to LiteLLM
    5. Proxy body to LiteLLM
    6. Record TPM for this response (gates the next request)
    7. Output rails (skip for bypass tenants)
    8. PII redact + trace write in background after response sent
    """
    rate_limiter = request.app.state.rate_limiter

    # 1. RPM check
    try:
        await rate_limiter.check_rpm(tenant.tenant_id, tenant.rpm_limit)
    except RateLimitExceeded as exc:
        logger.warning("429 RPM: tenant=%s limit=%d detail=%s", tenant.tenant_id, te
_write_trace function · python · L236-L295 (60 LOC)
harness/proxy/litellm.py
async def _write_trace(
    app,
    request_id: str,
    tenant: TenantConfig,
    body: dict,
    response_data: dict,
    latency_ms: int,
    status_code: int,
    guardrail_decisions=None,
    is_refusal: bool = False,
    cai_critique=None,
) -> None:
    """Extract, PII-redact, and write a trace record to SQLite.

    Called as a BackgroundTask after the response has been sent to the client.
    Raw PII never persists — redaction happens before any SQLite write.
    """
    # Extract prompt from messages (join all content fields)
    messages = body.get("messages", [])
    prompt_parts: list[str] = []
    for msg in messages:
        content = msg.get("content")
        if isinstance(content, str):
            prompt_parts.append(content)
        elif isinstance(content, list):
            # Multi-modal content: extract text parts
            for part in content:
                if isinstance(part, dict) and part.get("type") == "text":
                    prompt_parts.append(par
RateLimitExceeded class · python · L9-L14 (6 LOC)
harness/ratelimit/sliding_window.py
class RateLimitExceeded(Exception):
    """Raised when a tenant has exceeded a rate limit."""

    def __init__(self, detail: str) -> None:
        super().__init__(detail)
        self.detail = detail
__init__ method · python · L12-L14 (3 LOC)
harness/ratelimit/sliding_window.py
    def __init__(self, detail: str) -> None:
        super().__init__(detail)
        self.detail = detail
Repobility · MCP-ready · https://repobility.com
SlidingWindowLimiter class · python · L17-L71 (55 LOC)
harness/ratelimit/sliding_window.py
class SlidingWindowLimiter:
    """Thread-safe (asyncio) per-tenant sliding window limiter.

    RPM gate: checked and recorded PRE-request.
    TPM gate: recorded POST-response; checked PRE-next-request (one-request lag by design).
    """

    WINDOW = 60.0  # seconds

    def __init__(self) -> None:
        self._rpm_log: dict[str, deque] = defaultdict(deque)
        self._tpm_log: dict[str, deque[tuple[float, int]]] = defaultdict(deque)
        self._lock = asyncio.Lock()

    async def check_rpm(self, tenant_id: str, rpm_limit: int) -> None:
        """Check and record a request against the RPM limit.

        Raises:
            RateLimitExceeded: If the tenant has exceeded rpm_limit requests/minute.
        """
        async with self._lock:
            now = time.monotonic()
            q = self._rpm_log[tenant_id]
            # Expire entries outside the window
            while q and q[0] < now - self.WINDOW:
                q.popleft()
            if len(q) >= rpm_limit:
   
__init__ method · python · L26-L29 (4 LOC)
harness/ratelimit/sliding_window.py
    def __init__(self) -> None:
        self._rpm_log: dict[str, deque] = defaultdict(deque)
        self._tpm_log: dict[str, deque[tuple[float, int]]] = defaultdict(deque)
        self._lock = asyncio.Lock()
check_rpm method · python · L31-L45 (15 LOC)
harness/ratelimit/sliding_window.py
    async def check_rpm(self, tenant_id: str, rpm_limit: int) -> None:
        """Check and record a request against the RPM limit.

        Raises:
            RateLimitExceeded: If the tenant has exceeded rpm_limit requests/minute.
        """
        async with self._lock:
            now = time.monotonic()
            q = self._rpm_log[tenant_id]
            # Expire entries outside the window
            while q and q[0] < now - self.WINDOW:
                q.popleft()
            if len(q) >= rpm_limit:
                raise RateLimitExceeded("RPM limit exceeded")
            q.append(now)
check_tpm method · python · L47-L61 (15 LOC)
harness/ratelimit/sliding_window.py
    async def check_tpm(self, tenant_id: str, tpm_limit: int) -> None:
        """Check accumulated tokens against the TPM limit (pre-next-request gate).

        Raises:
            RateLimitExceeded: If total tokens in the current window exceed tpm_limit.
        """
        async with self._lock:
            now = time.monotonic()
            q = self._tpm_log[tenant_id]
            # Expire entries outside the window
            while q and q[0][0] < now - self.WINDOW:
                q.popleft()
            total = sum(tokens for _, tokens in q)
            if total >= tpm_limit:
                raise RateLimitExceeded("TPM limit exceeded")
record_tpm method · python · L63-L71 (9 LOC)
harness/ratelimit/sliding_window.py
    async def record_tpm(self, tenant_id: str, tokens: int) -> None:
        """Record token usage after a response is received.

        Args:
            tenant_id: The tenant to charge.
            tokens: Number of tokens used in this request/response.
        """
        async with self._lock:
            self._tpm_log[tenant_id].append((time.monotonic(), tokens))
check_balance function · python · L9-L50 (42 LOC)
harness/redteam/balance.py
def check_balance(
    pending_path: Path,
    active_dataset_dir: Path,
    max_category_ratio: float = 0.40,
) -> tuple[bool, dict[str, float]]:
    """Check if adding pending entries would violate category balance.

    Args:
        pending_path: Path to pending JSONL file.
        active_dataset_dir: Directory containing active *.jsonl datasets.
        max_category_ratio: Maximum fraction any single category may occupy.

    Returns:
        (ok, violations) where violations maps category -> actual ratio for violators.
    """
    counts: Counter = Counter()

    # Count categories in existing active datasets
    for f in active_dataset_dir.glob("*.jsonl"):
        for line in f.read_text().splitlines():
            if line.strip():
                counts[json.loads(line).get("category", "unknown")] += 1

    # Count categories in pending file
    pending_text = pending_path.read_text() if pending_path.exists() else ""
    pending_lines = [line for line in pending_text.splitlines
generate_adversarial_variants function · python · L27-L59 (33 LOC)
harness/redteam/engine.py
async def generate_adversarial_variants(
    http_client, judge_model: str, source_prompt: str, n: int = 3
) -> list[dict]:
    """Send a near-miss prompt to the judge model to generate adversarial variants.

    Args:
        http_client: httpx.AsyncClient with LiteLLM base_url set.
        judge_model: Model name for the judge (from constitution.judge_model).
        source_prompt: The original near-miss prompt to generate variants from.
        n: Number of variants to generate per source prompt.

    Returns:
        List of dicts with keys: prompt, technique, category. Empty on failure.
    """
    try:
        resp = await http_client.post(
            "/v1/chat/completions",
            json={
                "model": judge_model,
                "messages": [
                    {"role": "system", "content": ADVERSARIAL_SYSTEM_PROMPT.format(n=n)},
                    {"role": "user", "content": f"Source prompt:\n{source_prompt}"},
                ],
                "response_fo
run_deepteam_job function · python · L62-L137 (76 LOC)
harness/redteam/engine.py
async def run_deepteam_job(
    trace_store,
    http_client,
    judge_model: str,
    near_miss_window_days: int = 7,
    near_miss_limit: int = 100,
    near_miss_min_count: int = 5,
    variants_per_prompt: int = 3,
) -> dict:
    """Run a full deepteam adversarial generation job.

    1. Query near-miss traces from trace_store
    2. Send each to judge model for variant generation
    3. Write all variants to pending JSONL

    Args:
        trace_store: TraceStore instance with query_near_misses method.
        http_client: httpx.AsyncClient for LiteLLM calls.
        judge_model: Resolved judge model name.
        near_miss_window_days: How many days back to search for near-misses.
        near_miss_limit: Max near-miss traces to process.
        near_miss_min_count: Minimum near-misses required to run (below = skip).
        variants_per_prompt: Number of variants per source prompt.

    Returns:
        Dict with: near_miss_count, variants_generated, pending_file, categories, 
Powered by Repobility — scan your code at https://repobility.com
run_garak_scan function · python · L10-L65 (56 LOC)
harness/redteam/garak_runner.py
async def run_garak_scan(
    profile_config_path: str,
    api_key: str,
    report_dir: str,
    job_id: str,
    model_name: str = "harness-gateway",
    probes: str | None = None,
) -> dict:
    """Run garak as async subprocess and return parsed results.

    Uses asyncio.create_subprocess_exec (NOT subprocess.run) to avoid blocking
    the event loop during potentially long-running scans.

    Args:
        profile_config_path: Path to garak YAML config (e.g. redteam_quick.yaml).
        api_key: Tenant API key for the gateway — set as OPENAICOMPATIBLE_API_KEY env var.
        report_dir: Directory where garak writes report files.
        job_id: Unique job ID used as report filename prefix.
        model_name: Model name for --target_name flag.
        probes: Comma-separated probe list for --probes flag (optional).

    Returns:
        Dict with keys: exit_code, stdout, stderr, scores, report_path.
    """
    # Ensure report dir exists
    os.makedirs(report_dir, exist_ok=True
parse_garak_report function · python · L68-L96 (29 LOC)
harness/redteam/garak_runner.py
def parse_garak_report(report_path: str) -> dict:
    """Parse garak JSONL report file and extract probe scores.

    Args:
        report_path: Path to {job_id}.report.jsonl file.

    Returns:
        Dict of probe_name -> {"passed": int, "total": int, "pass_rate": float}.
    """
    scores = {}
    try:
        with open(report_path) as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                entry = json.loads(line)
                if entry.get("entry_type") == "eval":
                    probe = entry.get("probe", "unknown")
                    passed = entry.get("passed", 0)
                    total = max(entry.get("total", 1), 1)
                    scores[probe] = {
                        "passed": passed,
                        "total": total,
                        "pass_rate": round(passed / total, 4),
                    }
    except (FileNotFoundError, json.JSONDecodeError):
        pas
cmd_promote function · python · L16-L38 (23 LOC)
harness/redteam/__main__.py
def cmd_promote(args):
    """Promote a pending adversarial dataset to active after balance check."""
    pending_file = Path(args.file)
    if not pending_file.exists():
        print(f"ERROR: File not found: {pending_file}", file=sys.stderr)
        sys.exit(1)

    max_ratio = args.max_ratio

    ok, violations = check_balance(pending_file, ACTIVE_DIR, max_ratio)
    if not ok:
        print(f"ERROR: Balance check failed. Categories exceed {max_ratio*100:.0f}% cap:", file=sys.stderr)
        for cat, ratio in sorted(violations.items(), key=lambda x: -x[1]):
            print(f"  {cat}: {ratio*100:.1f}%", file=sys.stderr)
        sys.exit(1)

    dest = ACTIVE_DIR / pending_file.name
    shutil.move(str(pending_file), str(dest))
    print(f"Promoted: {dest}")

    # Show summary
    count = sum(1 for line in dest.read_text().splitlines() if line.strip())
    print(f"  Entries: {count}")
cmd_list function · python · L41-L52 (12 LOC)
harness/redteam/__main__.py
def cmd_list(args):
    """List pending adversarial datasets."""
    if not PENDING_DIR.exists():
        print("No pending directory found.")
        return
    files = sorted(PENDING_DIR.glob("*.jsonl"))
    if not files:
        print("No pending datasets.")
        return
    for f in files:
        count = sum(1 for line in f.read_text().splitlines() if line.strip())
        print(f"  {f.name}  ({count} entries)")
main function · python · L55-L80 (26 LOC)
harness/redteam/__main__.py
def main():
    parser = argparse.ArgumentParser(
        prog="python -m harness.redteam",
        description="Red team dataset management",
    )
    sub = parser.add_subparsers(dest="command")

    p_promote = sub.add_parser("promote", help="Promote pending dataset to active after balance check")
    p_promote.add_argument("file", help="Path to pending JSONL file")
    p_promote.add_argument(
        "--max-ratio",
        type=float,
        default=0.40,
        help="Max category ratio (default 0.40)",
    )

    sub.add_parser("list", help="List pending adversarial datasets")

    args = parser.parse_args()
    if args.command == "promote":
        cmd_promote(args)
    elif args.command == "list":
        cmd_list(args)
    else:
        parser.print_help()
        sys.exit(1)
JobRequest class · python · L28-L30 (3 LOC)
harness/redteam/router.py
class JobRequest(BaseModel):
    type: str  # "garak" or "deepteam"
    profile: str = "quick"  # garak profile name (ignored for deepteam)
submit_job function · python · L34-L61 (28 LOC)
harness/redteam/router.py
async def submit_job(
    request: Request,
    body: JobRequest,
    tenant: TenantConfig = Depends(verify_api_key),
):
    """Submit a red team job. Returns 202 Accepted with job_id, or 409 if a job is running."""
    lock: asyncio.Lock = request.app.state.redteam_lock
    if lock.locked():
        current_id = request.app.state.redteam_current_job_id
        return JSONResponse(
            status_code=409,
            content={"error": "A red team job is already running", "job_id": current_id},
        )

    if body.type not in ("garak", "deepteam"):
        return JSONResponse(status_code=400, content={"error": "type must be 'garak' or 'deepteam'"})

    job_id = f"rt-{uuid.uuid4().hex[:12]}"
    now = datetime.now(timezone.utc).isoformat()

    trace_store = request.app.state.trace_store
    await trace_store.create_job({"job_id": job_id, "type": body.type, "created_at": now})

    # Store task reference to prevent GC (Pitfall 3 from research)
    task = asyncio.create_task(_run
_run_job function · python · L64-L80 (17 LOC)
harness/redteam/router.py
async def _run_job(app, job_id: str, body: JobRequest):
    """Background task that acquires the lock, runs the job, and updates status."""
    async with app.state.redteam_lock:
        app.state.redteam_current_job_id = job_id
        trace_store = app.state.trace_store
        await trace_store.update_job_status(job_id, "running")
        try:
            if body.type == "garak":
                result = await _dispatch_garak(app, job_id, body.profile)
            else:
                result = await _dispatch_deepteam(app, job_id)
            await trace_store.update_job_status(job_id, "complete", result)
        except Exception as e:
            await trace_store.update_job_status(job_id, "failed", {"error": str(e)})
        finally:
            app.state.redteam_current_job_id = None
            app.state.redteam_active_task = None
Repobility · severity-and-effort ranking · https://repobility.com
_dispatch_garak function · python · L83-L127 (45 LOC)
harness/redteam/router.py
async def _dispatch_garak(app, job_id: str, profile: str) -> dict:
    """Run a garak scan job."""
    from harness.redteam.garak_runner import run_garak_scan

    config_dir = os.environ.get(
        "HARNESS_CONFIG_DIR",
        os.path.join(os.path.dirname(os.path.dirname(__file__)), "config"),
    )

    # Load redteam config for gateway URL
    redteam_config_path = os.path.join(config_dir, "redteam.yaml")
    with open(redteam_config_path) as f:
        redteam_cfg = yaml.safe_load(f).get("redteam", {})

    profile_path = os.path.join(config_dir, f"redteam_{profile}.yaml")
    if not os.path.exists(profile_path):
        return {"error": f"Profile '{profile}' not found at {profile_path}"}

    # Use a dedicated redteam API key if configured, or first tenant key
    api_key = os.environ.get("REDTEAM_API_KEY", "")

    data_dir = os.environ.get(
        "HARNESS_DATA_DIR",
        os.path.join(os.path.dirname(os.path.dirname(__file__)), "data"),
    )
    report_dir = os.path.join
_dispatch_deepteam function · python · L130-L159 (30 LOC)
harness/redteam/router.py
async def _dispatch_deepteam(app, job_id: str) -> dict:
    """Run a deepteam adversarial generation job."""
    from harness.redteam.engine import run_deepteam_job

    config_dir = os.environ.get(
        "HARNESS_CONFIG_DIR",
        os.path.join(os.path.dirname(os.path.dirname(__file__)), "config"),
    )
    redteam_config_path = os.path.join(config_dir, "redteam.yaml")
    with open(redteam_config_path) as f:
        redteam_cfg = yaml.safe_load(f).get("redteam", {})

    # Resolve judge model from constitution config
    critique_engine = getattr(app.state, "critique_engine", None)
    if critique_engine and critique_engine.constitution:
        judge_model = critique_engine.constitution.judge_model
        if judge_model == "default":
            judge_model = "gpt-3.5-turbo"  # Sensible default when "default" is set
    else:
        judge_model = "gpt-3.5-turbo"

    return await run_deepteam_job(
        trace_store=app.state.trace_store,
        http_client=app.state.http_c
get_job_status function · python · L163-L172 (10 LOC)
harness/redteam/router.py
async def get_job_status(
    request: Request,
    job_id: str,
    tenant: TenantConfig = Depends(verify_api_key),
):
    """Get status and result of a red team job by ID."""
    job = await request.app.state.trace_store.get_job(job_id)
    if job is None:
        return JSONResponse(status_code=404, content={"error": f"Job {job_id} not found"})
    return JSONResponse(content=job)
list_jobs function · python · L176-L183 (8 LOC)
harness/redteam/router.py
async def list_jobs(
    request: Request,
    tenant: TenantConfig = Depends(verify_api_key),
    limit: int = Query(default=20, le=100),
):
    """List recent red team jobs."""
    jobs = await request.app.state.trace_store.list_jobs(limit=limit)
    return JSONResponse(content={"jobs": jobs})
compute_priority function · python · L17-L44 (28 LOC)
harness/traces/store.py
def compute_priority(guardrail_decisions: dict) -> float:
    """Compute HITL review priority from guardrail decisions.

    Priority is 1.0 - min(threshold - score) for results with score > 0.
    Closest-to-threshold items get the highest priority.
    Returns 0.0 when there are no scoreable results.

    Args:
        guardrail_decisions: Dict with optional 'all_results' list.
            Each result should have 'score' and 'threshold' fields.

    Returns:
        Float in [0.0, 1.0], higher = more urgent review needed.
    """
    # guardrail_decisions may be a list (from trace JSON) or a dict with 'all_results'
    if isinstance(guardrail_decisions, list):
        all_results = guardrail_decisions
    else:
        all_results = guardrail_decisions.get("all_results", [])
    distances = []
    for result in all_results:
        score = result.get("score", 0)
        threshold = result.get("threshold", 1.0)
        if score > 0:
            distances.append(threshold - score)
    
_extract_triggering_rail function · python · L47-L70 (24 LOC)
harness/traces/store.py
def _extract_triggering_rail(guardrail_decisions) -> str | None:
    """Return the rail name from the all_results entry closest to threshold.

    Args:
        guardrail_decisions: List of rail results, or dict with 'all_results' key.

    Returns:
        Rail name string, or None if no results with score > 0.
    """
    if isinstance(guardrail_decisions, list):
        all_results = guardrail_decisions
    else:
        all_results = guardrail_decisions.get("all_results", [])
    best = None
    best_distance = float("inf")
    for result in all_results:
        score = result.get("score", 0)
        threshold = result.get("threshold", 1.0)
        if score > 0:
            distance = threshold - score
            if distance < best_distance:
                best_distance = distance
                best = result.get("rail_name") or result.get("rail")
    return best
init_db method · python · L87-L97 (11 LOC)
harness/traces/store.py
    async def init_db(self) -> None:
        """Create the traces table and indexes if they don't exist.

        Reads DDL from schema.sql sibling file and executes it.
        Called once during app lifespan.
        """
        schema_path = Path(__file__).parent / "schema.sql"
        schema = schema_path.read_text()
        async with aiosqlite.connect(self._db_path) as db:
            await db.executescript(schema)
            await db.commit()
write method · python · L99-L141 (43 LOC)
harness/traces/store.py
    async def write(self, record: dict) -> None:
        """Insert a trace record into SQLite.

        Args:
            record: Dict with fields matching the traces table schema.
                    guardrail_decisions and cai_critique may be None or a dict/list
                    (will be JSON-serialized).
        """
        guardrail_json = (
            json.dumps(record.get("guardrail_decisions"))
            if record.get("guardrail_decisions") is not None
            else None
        )
        cai_json = (
            json.dumps(record.get("cai_critique"))
            if record.get("cai_critique") is not None
            else None
        )
        async with aiosqlite.connect(self._db_path) as db:
            await db.execute(
                """
                INSERT INTO traces
                (request_id, tenant, timestamp, model, prompt, response,
                 latency_ms, status_code, guardrail_decisions, cai_critique,
                 refusal_event, bypass_flag)
 
Want this analysis on your repo? https://repobility.com/scan/
query_by_id method · python · L143-L155 (13 LOC)
harness/traces/store.py
    async def query_by_id(self, request_id: str) -> dict | None:
        """Fetch a single trace record by request_id.

        Returns:
            dict of the row, or None if not found.
        """
        async with aiosqlite.connect(self._db_path) as db:
            db.row_factory = aiosqlite.Row
            async with db.execute(
                "SELECT * FROM traces WHERE request_id = ?", (request_id,)
            ) as cursor:
                row = await cursor.fetchone()
                return dict(row) if row else None
write_eval_run method · python · L157-L180 (24 LOC)
harness/traces/store.py
    async def write_eval_run(self, run: dict) -> None:
        """Insert an eval run record into the eval_runs table.

        Args:
            run: Dict with fields: run_id, timestamp, source, metrics (dict),
                 config_snapshot (dict), baseline_name (str or None).
        """
        async with aiosqlite.connect(self._db_path) as db:
            await db.execute(
                """
                INSERT INTO eval_runs
                (run_id, timestamp, source, metrics, config_snapshot, baseline_name)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                (
                    run["run_id"],
                    run["timestamp"],
                    run["source"],
                    json.dumps(run["metrics"]),
                    json.dumps(run["config_snapshot"]),
                    run.get("baseline_name"),
                ),
            )
            await db.commit()
query_eval_runs method · python · L182-L213 (32 LOC)
harness/traces/store.py
    async def query_eval_runs(
        self, source: str | None = None, limit: int = 20
    ) -> list[dict]:
        """Fetch eval run records ordered by timestamp DESC with optional source filter.

        Args:
            source: Optional source filter ("replay" or "lm-eval").
            limit: Maximum number of records to return (default 20).

        Returns:
            List of eval run dicts with metrics and config_snapshot parsed from JSON.
        """
        async with aiosqlite.connect(self._db_path) as db:
            db.row_factory = aiosqlite.Row
            if source is not None:
                query = (
                    "SELECT * FROM eval_runs WHERE source = ? "
                    "ORDER BY timestamp DESC LIMIT ?"
                )
                params: tuple = (source, limit)
            else:
                query = "SELECT * FROM eval_runs ORDER BY timestamp DESC LIMIT ?"
                params = (limit,)
            async with db.execute(query, params) as c
create_job method · python · L215-L230 (16 LOC)
harness/traces/store.py
    async def create_job(self, job: dict) -> None:
        """Insert a new red team job with status='pending'.

        Args:
            job: Dict with fields: job_id, type ('garak' or 'deepteam').
        """
        created_at = datetime.now(timezone.utc).isoformat()
        async with aiosqlite.connect(self._db_path) as db:
            await db.execute(
                """
                INSERT INTO redteam_jobs (job_id, type, status, created_at)
                VALUES (?, ?, 'pending', ?)
                """,
                (job["job_id"], job["type"], created_at),
            )
            await db.commit()
update_job_status method · python · L232-L259 (28 LOC)
harness/traces/store.py
    async def update_job_status(
        self, job_id: str, status: str, result: dict | None = None
    ) -> None:
        """Update a red team job's status (and optionally result).

        Sets completed_at for terminal statuses ('complete', 'failed').

        Args:
            job_id: Unique job identifier.
            status: New status ('running', 'complete', 'failed').
            result: Optional result dict (JSON-serialized).
        """
        completed_at = (
            datetime.now(timezone.utc).isoformat()
            if status in ("complete", "failed")
            else None
        )
        result_json = json.dumps(result) if result is not None else None
        async with aiosqlite.connect(self._db_path) as db:
            await db.execute(
                """
                UPDATE redteam_jobs
                SET status = ?, completed_at = ?, result = ?
                WHERE job_id = ?
                """,
                (status, completed_at, result_json, job_id),
get_job method · python · L261-L278 (18 LOC)
harness/traces/store.py
    async def get_job(self, job_id: str) -> dict | None:
        """Fetch a single red team job by job_id.

        Returns:
            dict of the row with result parsed from JSON, or None if not found.
        """
        async with aiosqlite.connect(self._db_path) as db:
            db.row_factory = aiosqlite.Row
            async with db.execute(
                "SELECT * FROM redteam_jobs WHERE job_id = ?", (job_id,)
            ) as cursor:
                row = await cursor.fetchone()
                if row is None:
                    return None
                record = dict(row)
                if record.get("result") is not None:
                    record["result"] = json.loads(record["result"])
                return record
list_jobs method · python · L280-L302 (23 LOC)
harness/traces/store.py
    async def list_jobs(self, limit: int = 20) -> list[dict]:
        """Fetch red team jobs ordered by created_at DESC.

        Args:
            limit: Maximum number of records to return (default 20).

        Returns:
            List of job dicts with result parsed from JSON.
        """
        async with aiosqlite.connect(self._db_path) as db:
            db.row_factory = aiosqlite.Row
            async with db.execute(
                "SELECT * FROM redteam_jobs ORDER BY created_at DESC LIMIT ?",
                (limit,),
            ) as cursor:
                rows = await cursor.fetchall()
                result = []
                for row in rows:
                    record = dict(row)
                    if record.get("result") is not None:
                        record["result"] = json.loads(record["result"])
                    result.append(record)
                return result
query_near_misses method · python · L304-L340 (37 LOC)
harness/traces/store.py
    async def query_near_misses(self, since: str, limit: int = 100) -> list[dict]:
        """Fetch traces that scored above zero but were not blocked.

        Queries traces since the given timestamp that were not refusals and
        had guardrail decisions recorded, then filters in Python to keep only
        those where at least one rail result has score > 0.

        Args:
            since: ISO8601 string — lower bound (inclusive).
            limit: Maximum number of records to return (default 100).

        Returns:
            List of trace dicts matching near-miss criteria.
        """
        async with aiosqlite.connect(self._db_path) as db:
            db.row_factory = aiosqlite.Row
            async with db.execute(
                """
                SELECT * FROM traces
                WHERE timestamp >= ?
                  AND guardrail_decisions IS NOT NULL
                  AND refusal_event = 0
                ORDER BY timestamp DESC
                LIMIT ?
       
Repobility · MCP-ready · https://repobility.com
write_correction method · python · L342-L377 (36 LOC)
harness/traces/store.py
    async def write_correction(self, correction: dict) -> None:
        """Insert a correction record into the corrections table.

        PII in edited_response is redacted before storage.
        Validates that action is one of 'approve', 'reject', 'edit' — SQLite
        CHECK constraint enforces this; raises on invalid action.

        Args:
            correction: Dict with fields: request_id, reviewer, action,
                        edited_response (optional), trace_ref (optional).
        """
        from harness.pii.redactor import redact as redact_text

        edited_response = correction.get("edited_response")
        if edited_response is not None:
            edited_response = redact_text(edited_response)

        created_at = correction.get("created_at") or datetime.now(timezone.utc).isoformat()

        async with aiosqlite.connect(self._db_path) as db:
            await db.execute(
                """
                INSERT INTO corrections
                (request_id,
query_corrections method · python · L379-L398 (20 LOC)
harness/traces/store.py
    async def query_corrections(self, request_id: str | None = None) -> list[dict]:
        """Fetch correction records.

        Args:
            request_id: If provided, filter to corrections for this request_id.

        Returns:
            List of correction dicts ordered by created_at DESC.
        """
        async with aiosqlite.connect(self._db_path) as db:
            db.row_factory = aiosqlite.Row
            if request_id is not None:
                query = "SELECT * FROM corrections WHERE request_id = ? ORDER BY created_at DESC"
                params: tuple = (request_id,)
            else:
                query = "SELECT * FROM corrections ORDER BY created_at DESC"
                params = ()
            async with db.execute(query, params) as cursor:
                rows = await cursor.fetchall()
                return [dict(row) for row in rows]
query_hitl_queue method · python · L400-L488 (89 LOC)
harness/traces/store.py
    async def query_hitl_queue(
        self,
        since: str,
        rail_filter: str = "all",
        tenant_filter: str = "all",
        hide_reviewed: bool = False,
        limit: int = 200,
    ) -> list[dict]:
        """Fetch HITL review queue: flagged traces sorted by priority.

        Priority ordering: unreviewed items before reviewed, within each group
        sorted by priority DESC (closest to threshold first).

        Args:
            since: ISO8601 timestamp lower bound (inclusive).
            rail_filter: Rail name to filter on, or 'all' for no filter.
            tenant_filter: Tenant ID to filter on, or 'all' for no filter.
            hide_reviewed: If True, exclude traces with existing corrections.
            limit: Maximum traces to fetch from DB before Python post-processing.

        Returns:
            List of trace dicts augmented with: priority (float),
            triggering_rail (str|None), correction_action (str|None),
            correction_revie
‹ prevpage 3 / 4next ›