Function bodies 178 total
get_hitl_queue function · python · L28-L55 (28 LOC)harness/hitl/router.py
async def get_hitl_queue(
request: Request,
tenant: TenantConfig = Depends(verify_api_key),
rail: str = Query(default="all"),
tenant_filter: str = Query(default="all", alias="tenant"),
since: str = Query(default="24h"),
hide_reviewed: bool = Query(default=False),
) -> JSONResponse:
"""Return the HITL review queue, priority-sorted.
Items are sorted by review urgency: closest-to-threshold flagged traces
appear first; already-reviewed items sort to the bottom.
Query parameters:
rail: Filter by triggering rail name, or 'all' (default).
tenant: Filter by tenant ID, or 'all' (default).
since: Time window — ISO8601 timestamp or shorthand like '24h', '7d'.
hide_reviewed: If true, exclude traces that have corrections.
"""
since_ts = _resolve_since(since)
trace_store = request.app.state.trace_store
results = await trace_store.query_hitl_queue(
since=since_ts,
rail_filter=rail,
tenantsubmit_correction function · python · L59-L72 (14 LOC)harness/hitl/router.py
async def submit_correction(
request: Request,
body: CorrectionRequest,
tenant: TenantConfig = Depends(verify_api_key),
) -> JSONResponse:
"""Submit a human correction for a flagged trace.
Action must be one of: approve, reject, edit.
When action is 'edit', edited_response should contain the corrected text
(PII will be redacted before storage).
"""
trace_store = request.app.state.trace_store
await trace_store.write_correction(body.model_dump())
return JSONResponse(content={"status": "ok", "request_id": body.request_id})_extract_triggering_rail_inline function · python · L15-L35 (21 LOC)harness/hitl/ui.py
def _extract_triggering_rail_inline(guardrail_decisions: dict) -> str | None:
"""Return the rail name from the all_results entry closest to threshold.
Mirrors harness.traces.store._extract_triggering_rail for inline use
without importing the async store module.
"""
if isinstance(guardrail_decisions, list):
all_results = guardrail_decisions
else:
all_results = guardrail_decisions.get("all_results", [])
best: str | None = None
best_distance = float("inf")
for result in all_results:
score = result.get("score", 0)
threshold = result.get("threshold", 1.0)
if score > 0:
distance = threshold - score
if distance < best_distance:
best_distance = distance
best = result.get("rail_name") or result.get("rail")
return best_action_taken function · python · L38-L62 (25 LOC)harness/hitl/ui.py
def _action_taken(item: dict) -> str:
"""Derive human-readable action from guardrail_decisions dict."""
gd = item.get("guardrail_decisions") or {}
if isinstance(gd, str):
try:
gd = json.loads(gd)
except (json.JSONDecodeError, TypeError):
gd = {}
status_code = item.get("status_code", 200)
refusal_event = item.get("refusal_event", 0)
# guardrail_decisions may be a list (from trace JSON) or a dict with 'all_results'
if isinstance(gd, list):
all_results = gd
else:
all_results = gd.get("all_results", [])
# Any result with score > threshold means it was blocked/steered
has_cai = item.get("cai_critique") is not None
if refusal_event:
return "blocked"
if has_cai:
return "critiqued"
if status_code in (200, None):
if any(r.get("score", 0) > r.get("threshold", 1.0) for r in all_results):
return "blocked"
return "allowed"build_ui function · python · L65-L414 (350 LOC)harness/hitl/ui.py
def build_ui(api_url: str, api_key: str): # -> gr.Blocks
"""Build and return a Gradio Blocks dashboard for HITL review.
Args:
api_url: Base URL of the harness API (e.g. "http://localhost:8080").
api_key: Bearer token for the harness API.
Returns:
gr.Blocks instance (not yet launched — caller calls .launch()).
"""
import gradio as gr # imported here so module loads without gradio installed
client = httpx.Client(
base_url=api_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=30.0,
)
# -----------------------------------------------------------------------
# Callback implementations
# -----------------------------------------------------------------------
def refresh_queue(rail: str, tenant: str, time_range: str, hide_rev: bool) -> list[list[Any]]:
"""Fetch queue from API and return formatted dataframe rows."""
try:
resp = client.get(
"/lifespan function · python · L23-L79 (57 LOC)harness/main.py
async def lifespan(app: FastAPI):
"""App lifespan: load tenants, create HTTP client pool, initialize rate limiter and trace store."""
# Load tenant config
tenants_path = os.path.join(_CONFIG_DIR, "tenants.yaml")
app.state.tenants = load_tenants(tenants_path)
# Shared async HTTP client for proxying to LiteLLM
app.state.http_client = httpx.AsyncClient(
base_url=_LITELLM_BASE,
timeout=httpx.Timeout(120.0),
limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
)
# In-memory rate limiter (per-tenant RPM + TPM)
app.state.rate_limiter = SlidingWindowLimiter()
# Trace store — initialize SQLite schema (WAL mode, indexes)
os.makedirs(_DATA_DIR, exist_ok=True)
db_path = os.path.join(_DATA_DIR, "traces.db")
app.state.trace_store = TraceStore(db_path=db_path)
await app.state.trace_store.init_db()
# Eagerly import PII redactor so AnalyzerEngine loads at startup
import harness.pii.redactor # probe function · python · L104-L106 (3 LOC)harness/main.py
async def probe(tenant: Annotated[TenantConfig, Depends(verify_api_key)]):
"""Probe endpoint used in tests to verify auth resolves to the correct tenant."""
return {"tenant_id": tenant.tenant_id, "bypass": tenant.bypass}Want this analysis on your repo? https://repobility.com/scan/
_regex_redact function · python · L98-L102 (5 LOC)harness/pii/redactor.py
def _regex_redact(text: str) -> str:
"""Apply regex patterns for structured PII replacement."""
for pattern, replacement in _REGEX_PATTERNS:
text = pattern.sub(replacement, text)
return textredact function · python · L105-L132 (28 LOC)harness/pii/redactor.py
def redact(text: str, strictness: str = "balanced") -> str:
"""Redact PII from text using a regex pre-pass and Presidio NER.
Args:
text: Input text, potentially containing PII.
strictness: One of "strict", "balanced", or "minimal".
Controls which Presidio entity types are detected.
Returns:
Text with PII replaced by typed tokens such as [EMAIL], [PHONE],
[SSN], [CREDIT_CARD], [NAME], [ADDRESS], [REDACTED].
"""
# Step 1: Regex pre-pass — catches structured PII regardless of spaCy model
text = _regex_redact(text)
# Step 2: Presidio NER pass — catches unstructured PII (names, addresses, etc.)
entities = STRICTNESS_ENTITIES.get(strictness, STRICTNESS_ENTITIES["balanced"])
results = _analyzer.analyze(text=text, entities=entities, language="en")
if not results:
return text
anonymized = _anonymizer.anonymize(
text=text,
analyzer_results=results,
operators=_OPERAsuggest_tuning function · python · L17-L49 (33 LOC)harness/proxy/admin.py
async def suggest_tuning(
request: Request,
tenant: TenantConfig = Depends(verify_api_key),
since: str = Query(default="24h", description="Time window: ISO8601 timestamp or shorthand like '24h', '7d'"),
):
"""Trigger on-demand tuning analysis based on trace history.
Returns ranked threshold + principle tuning suggestions as both
human-readable report and machine-readable YAML diffs.
"""
from harness.critique.analyzer import analyze_traces
# Resolve shorthand time strings
since_ts = _resolve_since(since)
trace_store = request.app.state.trace_store
http_client = request.app.state.http_client
critique_engine = getattr(request.app.state, "critique_engine", None)
constitution = critique_engine.constitution if critique_engine else None
if constitution is None:
return JSONResponse(
content={"error": "Constitutional AI not configured"},
status_code=503,
)
result = await analyze_traces(
_resolve_since function · python · L52-L62 (11 LOC)harness/proxy/admin.py
def _resolve_since(since: str) -> str:
"""Convert shorthand like '24h', '7d' to ISO8601 timestamp."""
now = datetime.now(timezone.utc)
if since.endswith("h"):
hours = int(since[:-1])
return (now - timedelta(hours=hours)).isoformat()
elif since.endswith("d"):
days = int(since[:-1])
return (now - timedelta(days=days)).isoformat()
else:
return since # Assume ISO8601 alreadychat_completions function · python · L28-L233 (206 LOC)harness/proxy/litellm.py
async def chat_completions(
request: Request,
tenant: TenantConfig = Depends(verify_api_key),
) -> JSONResponse:
"""Proxy /v1/chat/completions to LiteLLM with auth, rate limiting, guardrails, and tracing.
Pipeline:
1. RPM check (pre-request)
2. TPM check (checks previous request's accumulated tokens)
3. Read request body
4. Guardrail pipeline (skip for bypass tenants):
a. Unicode normalize
b. Input rails
c. If blocked: return refusal or soft-steer to LiteLLM
5. Proxy body to LiteLLM
6. Record TPM for this response (gates the next request)
7. Output rails (skip for bypass tenants)
8. PII redact + trace write in background after response sent
"""
rate_limiter = request.app.state.rate_limiter
# 1. RPM check
try:
await rate_limiter.check_rpm(tenant.tenant_id, tenant.rpm_limit)
except RateLimitExceeded as exc:
logger.warning("429 RPM: tenant=%s limit=%d detail=%s", tenant.tenant_id, te_write_trace function · python · L236-L295 (60 LOC)harness/proxy/litellm.py
async def _write_trace(
app,
request_id: str,
tenant: TenantConfig,
body: dict,
response_data: dict,
latency_ms: int,
status_code: int,
guardrail_decisions=None,
is_refusal: bool = False,
cai_critique=None,
) -> None:
"""Extract, PII-redact, and write a trace record to SQLite.
Called as a BackgroundTask after the response has been sent to the client.
Raw PII never persists — redaction happens before any SQLite write.
"""
# Extract prompt from messages (join all content fields)
messages = body.get("messages", [])
prompt_parts: list[str] = []
for msg in messages:
content = msg.get("content")
if isinstance(content, str):
prompt_parts.append(content)
elif isinstance(content, list):
# Multi-modal content: extract text parts
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
prompt_parts.append(parRateLimitExceeded class · python · L9-L14 (6 LOC)harness/ratelimit/sliding_window.py
class RateLimitExceeded(Exception):
"""Raised when a tenant has exceeded a rate limit."""
def __init__(self, detail: str) -> None:
super().__init__(detail)
self.detail = detail__init__ method · python · L12-L14 (3 LOC)harness/ratelimit/sliding_window.py
def __init__(self, detail: str) -> None:
super().__init__(detail)
self.detail = detailRepobility · MCP-ready · https://repobility.com
SlidingWindowLimiter class · python · L17-L71 (55 LOC)harness/ratelimit/sliding_window.py
class SlidingWindowLimiter:
"""Thread-safe (asyncio) per-tenant sliding window limiter.
RPM gate: checked and recorded PRE-request.
TPM gate: recorded POST-response; checked PRE-next-request (one-request lag by design).
"""
WINDOW = 60.0 # seconds
def __init__(self) -> None:
self._rpm_log: dict[str, deque] = defaultdict(deque)
self._tpm_log: dict[str, deque[tuple[float, int]]] = defaultdict(deque)
self._lock = asyncio.Lock()
async def check_rpm(self, tenant_id: str, rpm_limit: int) -> None:
"""Check and record a request against the RPM limit.
Raises:
RateLimitExceeded: If the tenant has exceeded rpm_limit requests/minute.
"""
async with self._lock:
now = time.monotonic()
q = self._rpm_log[tenant_id]
# Expire entries outside the window
while q and q[0] < now - self.WINDOW:
q.popleft()
if len(q) >= rpm_limit:
__init__ method · python · L26-L29 (4 LOC)harness/ratelimit/sliding_window.py
def __init__(self) -> None:
self._rpm_log: dict[str, deque] = defaultdict(deque)
self._tpm_log: dict[str, deque[tuple[float, int]]] = defaultdict(deque)
self._lock = asyncio.Lock()check_rpm method · python · L31-L45 (15 LOC)harness/ratelimit/sliding_window.py
async def check_rpm(self, tenant_id: str, rpm_limit: int) -> None:
"""Check and record a request against the RPM limit.
Raises:
RateLimitExceeded: If the tenant has exceeded rpm_limit requests/minute.
"""
async with self._lock:
now = time.monotonic()
q = self._rpm_log[tenant_id]
# Expire entries outside the window
while q and q[0] < now - self.WINDOW:
q.popleft()
if len(q) >= rpm_limit:
raise RateLimitExceeded("RPM limit exceeded")
q.append(now)check_tpm method · python · L47-L61 (15 LOC)harness/ratelimit/sliding_window.py
async def check_tpm(self, tenant_id: str, tpm_limit: int) -> None:
"""Check accumulated tokens against the TPM limit (pre-next-request gate).
Raises:
RateLimitExceeded: If total tokens in the current window exceed tpm_limit.
"""
async with self._lock:
now = time.monotonic()
q = self._tpm_log[tenant_id]
# Expire entries outside the window
while q and q[0][0] < now - self.WINDOW:
q.popleft()
total = sum(tokens for _, tokens in q)
if total >= tpm_limit:
raise RateLimitExceeded("TPM limit exceeded")record_tpm method · python · L63-L71 (9 LOC)harness/ratelimit/sliding_window.py
async def record_tpm(self, tenant_id: str, tokens: int) -> None:
"""Record token usage after a response is received.
Args:
tenant_id: The tenant to charge.
tokens: Number of tokens used in this request/response.
"""
async with self._lock:
self._tpm_log[tenant_id].append((time.monotonic(), tokens))check_balance function · python · L9-L50 (42 LOC)harness/redteam/balance.py
def check_balance(
pending_path: Path,
active_dataset_dir: Path,
max_category_ratio: float = 0.40,
) -> tuple[bool, dict[str, float]]:
"""Check if adding pending entries would violate category balance.
Args:
pending_path: Path to pending JSONL file.
active_dataset_dir: Directory containing active *.jsonl datasets.
max_category_ratio: Maximum fraction any single category may occupy.
Returns:
(ok, violations) where violations maps category -> actual ratio for violators.
"""
counts: Counter = Counter()
# Count categories in existing active datasets
for f in active_dataset_dir.glob("*.jsonl"):
for line in f.read_text().splitlines():
if line.strip():
counts[json.loads(line).get("category", "unknown")] += 1
# Count categories in pending file
pending_text = pending_path.read_text() if pending_path.exists() else ""
pending_lines = [line for line in pending_text.splitlinesgenerate_adversarial_variants function · python · L27-L59 (33 LOC)harness/redteam/engine.py
async def generate_adversarial_variants(
http_client, judge_model: str, source_prompt: str, n: int = 3
) -> list[dict]:
"""Send a near-miss prompt to the judge model to generate adversarial variants.
Args:
http_client: httpx.AsyncClient with LiteLLM base_url set.
judge_model: Model name for the judge (from constitution.judge_model).
source_prompt: The original near-miss prompt to generate variants from.
n: Number of variants to generate per source prompt.
Returns:
List of dicts with keys: prompt, technique, category. Empty on failure.
"""
try:
resp = await http_client.post(
"/v1/chat/completions",
json={
"model": judge_model,
"messages": [
{"role": "system", "content": ADVERSARIAL_SYSTEM_PROMPT.format(n=n)},
{"role": "user", "content": f"Source prompt:\n{source_prompt}"},
],
"response_forun_deepteam_job function · python · L62-L137 (76 LOC)harness/redteam/engine.py
async def run_deepteam_job(
trace_store,
http_client,
judge_model: str,
near_miss_window_days: int = 7,
near_miss_limit: int = 100,
near_miss_min_count: int = 5,
variants_per_prompt: int = 3,
) -> dict:
"""Run a full deepteam adversarial generation job.
1. Query near-miss traces from trace_store
2. Send each to judge model for variant generation
3. Write all variants to pending JSONL
Args:
trace_store: TraceStore instance with query_near_misses method.
http_client: httpx.AsyncClient for LiteLLM calls.
judge_model: Resolved judge model name.
near_miss_window_days: How many days back to search for near-misses.
near_miss_limit: Max near-miss traces to process.
near_miss_min_count: Minimum near-misses required to run (below = skip).
variants_per_prompt: Number of variants per source prompt.
Returns:
Dict with: near_miss_count, variants_generated, pending_file, categories, Powered by Repobility — scan your code at https://repobility.com
run_garak_scan function · python · L10-L65 (56 LOC)harness/redteam/garak_runner.py
async def run_garak_scan(
profile_config_path: str,
api_key: str,
report_dir: str,
job_id: str,
model_name: str = "harness-gateway",
probes: str | None = None,
) -> dict:
"""Run garak as async subprocess and return parsed results.
Uses asyncio.create_subprocess_exec (NOT subprocess.run) to avoid blocking
the event loop during potentially long-running scans.
Args:
profile_config_path: Path to garak YAML config (e.g. redteam_quick.yaml).
api_key: Tenant API key for the gateway — set as OPENAICOMPATIBLE_API_KEY env var.
report_dir: Directory where garak writes report files.
job_id: Unique job ID used as report filename prefix.
model_name: Model name for --target_name flag.
probes: Comma-separated probe list for --probes flag (optional).
Returns:
Dict with keys: exit_code, stdout, stderr, scores, report_path.
"""
# Ensure report dir exists
os.makedirs(report_dir, exist_ok=Trueparse_garak_report function · python · L68-L96 (29 LOC)harness/redteam/garak_runner.py
def parse_garak_report(report_path: str) -> dict:
"""Parse garak JSONL report file and extract probe scores.
Args:
report_path: Path to {job_id}.report.jsonl file.
Returns:
Dict of probe_name -> {"passed": int, "total": int, "pass_rate": float}.
"""
scores = {}
try:
with open(report_path) as f:
for line in f:
line = line.strip()
if not line:
continue
entry = json.loads(line)
if entry.get("entry_type") == "eval":
probe = entry.get("probe", "unknown")
passed = entry.get("passed", 0)
total = max(entry.get("total", 1), 1)
scores[probe] = {
"passed": passed,
"total": total,
"pass_rate": round(passed / total, 4),
}
except (FileNotFoundError, json.JSONDecodeError):
pascmd_promote function · python · L16-L38 (23 LOC)harness/redteam/__main__.py
def cmd_promote(args):
"""Promote a pending adversarial dataset to active after balance check."""
pending_file = Path(args.file)
if not pending_file.exists():
print(f"ERROR: File not found: {pending_file}", file=sys.stderr)
sys.exit(1)
max_ratio = args.max_ratio
ok, violations = check_balance(pending_file, ACTIVE_DIR, max_ratio)
if not ok:
print(f"ERROR: Balance check failed. Categories exceed {max_ratio*100:.0f}% cap:", file=sys.stderr)
for cat, ratio in sorted(violations.items(), key=lambda x: -x[1]):
print(f" {cat}: {ratio*100:.1f}%", file=sys.stderr)
sys.exit(1)
dest = ACTIVE_DIR / pending_file.name
shutil.move(str(pending_file), str(dest))
print(f"Promoted: {dest}")
# Show summary
count = sum(1 for line in dest.read_text().splitlines() if line.strip())
print(f" Entries: {count}")cmd_list function · python · L41-L52 (12 LOC)harness/redteam/__main__.py
def cmd_list(args):
"""List pending adversarial datasets."""
if not PENDING_DIR.exists():
print("No pending directory found.")
return
files = sorted(PENDING_DIR.glob("*.jsonl"))
if not files:
print("No pending datasets.")
return
for f in files:
count = sum(1 for line in f.read_text().splitlines() if line.strip())
print(f" {f.name} ({count} entries)")main function · python · L55-L80 (26 LOC)harness/redteam/__main__.py
def main():
parser = argparse.ArgumentParser(
prog="python -m harness.redteam",
description="Red team dataset management",
)
sub = parser.add_subparsers(dest="command")
p_promote = sub.add_parser("promote", help="Promote pending dataset to active after balance check")
p_promote.add_argument("file", help="Path to pending JSONL file")
p_promote.add_argument(
"--max-ratio",
type=float,
default=0.40,
help="Max category ratio (default 0.40)",
)
sub.add_parser("list", help="List pending adversarial datasets")
args = parser.parse_args()
if args.command == "promote":
cmd_promote(args)
elif args.command == "list":
cmd_list(args)
else:
parser.print_help()
sys.exit(1)JobRequest class · python · L28-L30 (3 LOC)harness/redteam/router.py
class JobRequest(BaseModel):
type: str # "garak" or "deepteam"
profile: str = "quick" # garak profile name (ignored for deepteam)submit_job function · python · L34-L61 (28 LOC)harness/redteam/router.py
async def submit_job(
request: Request,
body: JobRequest,
tenant: TenantConfig = Depends(verify_api_key),
):
"""Submit a red team job. Returns 202 Accepted with job_id, or 409 if a job is running."""
lock: asyncio.Lock = request.app.state.redteam_lock
if lock.locked():
current_id = request.app.state.redteam_current_job_id
return JSONResponse(
status_code=409,
content={"error": "A red team job is already running", "job_id": current_id},
)
if body.type not in ("garak", "deepteam"):
return JSONResponse(status_code=400, content={"error": "type must be 'garak' or 'deepteam'"})
job_id = f"rt-{uuid.uuid4().hex[:12]}"
now = datetime.now(timezone.utc).isoformat()
trace_store = request.app.state.trace_store
await trace_store.create_job({"job_id": job_id, "type": body.type, "created_at": now})
# Store task reference to prevent GC (Pitfall 3 from research)
task = asyncio.create_task(_run_run_job function · python · L64-L80 (17 LOC)harness/redteam/router.py
async def _run_job(app, job_id: str, body: JobRequest):
"""Background task that acquires the lock, runs the job, and updates status."""
async with app.state.redteam_lock:
app.state.redteam_current_job_id = job_id
trace_store = app.state.trace_store
await trace_store.update_job_status(job_id, "running")
try:
if body.type == "garak":
result = await _dispatch_garak(app, job_id, body.profile)
else:
result = await _dispatch_deepteam(app, job_id)
await trace_store.update_job_status(job_id, "complete", result)
except Exception as e:
await trace_store.update_job_status(job_id, "failed", {"error": str(e)})
finally:
app.state.redteam_current_job_id = None
app.state.redteam_active_task = NoneRepobility · severity-and-effort ranking · https://repobility.com
_dispatch_garak function · python · L83-L127 (45 LOC)harness/redteam/router.py
async def _dispatch_garak(app, job_id: str, profile: str) -> dict:
"""Run a garak scan job."""
from harness.redteam.garak_runner import run_garak_scan
config_dir = os.environ.get(
"HARNESS_CONFIG_DIR",
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config"),
)
# Load redteam config for gateway URL
redteam_config_path = os.path.join(config_dir, "redteam.yaml")
with open(redteam_config_path) as f:
redteam_cfg = yaml.safe_load(f).get("redteam", {})
profile_path = os.path.join(config_dir, f"redteam_{profile}.yaml")
if not os.path.exists(profile_path):
return {"error": f"Profile '{profile}' not found at {profile_path}"}
# Use a dedicated redteam API key if configured, or first tenant key
api_key = os.environ.get("REDTEAM_API_KEY", "")
data_dir = os.environ.get(
"HARNESS_DATA_DIR",
os.path.join(os.path.dirname(os.path.dirname(__file__)), "data"),
)
report_dir = os.path.join_dispatch_deepteam function · python · L130-L159 (30 LOC)harness/redteam/router.py
async def _dispatch_deepteam(app, job_id: str) -> dict:
"""Run a deepteam adversarial generation job."""
from harness.redteam.engine import run_deepteam_job
config_dir = os.environ.get(
"HARNESS_CONFIG_DIR",
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config"),
)
redteam_config_path = os.path.join(config_dir, "redteam.yaml")
with open(redteam_config_path) as f:
redteam_cfg = yaml.safe_load(f).get("redteam", {})
# Resolve judge model from constitution config
critique_engine = getattr(app.state, "critique_engine", None)
if critique_engine and critique_engine.constitution:
judge_model = critique_engine.constitution.judge_model
if judge_model == "default":
judge_model = "gpt-3.5-turbo" # Sensible default when "default" is set
else:
judge_model = "gpt-3.5-turbo"
return await run_deepteam_job(
trace_store=app.state.trace_store,
http_client=app.state.http_cget_job_status function · python · L163-L172 (10 LOC)harness/redteam/router.py
async def get_job_status(
request: Request,
job_id: str,
tenant: TenantConfig = Depends(verify_api_key),
):
"""Get status and result of a red team job by ID."""
job = await request.app.state.trace_store.get_job(job_id)
if job is None:
return JSONResponse(status_code=404, content={"error": f"Job {job_id} not found"})
return JSONResponse(content=job)list_jobs function · python · L176-L183 (8 LOC)harness/redteam/router.py
async def list_jobs(
request: Request,
tenant: TenantConfig = Depends(verify_api_key),
limit: int = Query(default=20, le=100),
):
"""List recent red team jobs."""
jobs = await request.app.state.trace_store.list_jobs(limit=limit)
return JSONResponse(content={"jobs": jobs})compute_priority function · python · L17-L44 (28 LOC)harness/traces/store.py
def compute_priority(guardrail_decisions: dict) -> float:
"""Compute HITL review priority from guardrail decisions.
Priority is 1.0 - min(threshold - score) for results with score > 0.
Closest-to-threshold items get the highest priority.
Returns 0.0 when there are no scoreable results.
Args:
guardrail_decisions: Dict with optional 'all_results' list.
Each result should have 'score' and 'threshold' fields.
Returns:
Float in [0.0, 1.0], higher = more urgent review needed.
"""
# guardrail_decisions may be a list (from trace JSON) or a dict with 'all_results'
if isinstance(guardrail_decisions, list):
all_results = guardrail_decisions
else:
all_results = guardrail_decisions.get("all_results", [])
distances = []
for result in all_results:
score = result.get("score", 0)
threshold = result.get("threshold", 1.0)
if score > 0:
distances.append(threshold - score)
_extract_triggering_rail function · python · L47-L70 (24 LOC)harness/traces/store.py
def _extract_triggering_rail(guardrail_decisions) -> str | None:
"""Return the rail name from the all_results entry closest to threshold.
Args:
guardrail_decisions: List of rail results, or dict with 'all_results' key.
Returns:
Rail name string, or None if no results with score > 0.
"""
if isinstance(guardrail_decisions, list):
all_results = guardrail_decisions
else:
all_results = guardrail_decisions.get("all_results", [])
best = None
best_distance = float("inf")
for result in all_results:
score = result.get("score", 0)
threshold = result.get("threshold", 1.0)
if score > 0:
distance = threshold - score
if distance < best_distance:
best_distance = distance
best = result.get("rail_name") or result.get("rail")
return bestinit_db method · python · L87-L97 (11 LOC)harness/traces/store.py
async def init_db(self) -> None:
"""Create the traces table and indexes if they don't exist.
Reads DDL from schema.sql sibling file and executes it.
Called once during app lifespan.
"""
schema_path = Path(__file__).parent / "schema.sql"
schema = schema_path.read_text()
async with aiosqlite.connect(self._db_path) as db:
await db.executescript(schema)
await db.commit()write method · python · L99-L141 (43 LOC)harness/traces/store.py
async def write(self, record: dict) -> None:
"""Insert a trace record into SQLite.
Args:
record: Dict with fields matching the traces table schema.
guardrail_decisions and cai_critique may be None or a dict/list
(will be JSON-serialized).
"""
guardrail_json = (
json.dumps(record.get("guardrail_decisions"))
if record.get("guardrail_decisions") is not None
else None
)
cai_json = (
json.dumps(record.get("cai_critique"))
if record.get("cai_critique") is not None
else None
)
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"""
INSERT INTO traces
(request_id, tenant, timestamp, model, prompt, response,
latency_ms, status_code, guardrail_decisions, cai_critique,
refusal_event, bypass_flag)
Want this analysis on your repo? https://repobility.com/scan/
query_by_id method · python · L143-L155 (13 LOC)harness/traces/store.py
async def query_by_id(self, request_id: str) -> dict | None:
"""Fetch a single trace record by request_id.
Returns:
dict of the row, or None if not found.
"""
async with aiosqlite.connect(self._db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM traces WHERE request_id = ?", (request_id,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else Nonewrite_eval_run method · python · L157-L180 (24 LOC)harness/traces/store.py
async def write_eval_run(self, run: dict) -> None:
"""Insert an eval run record into the eval_runs table.
Args:
run: Dict with fields: run_id, timestamp, source, metrics (dict),
config_snapshot (dict), baseline_name (str or None).
"""
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"""
INSERT INTO eval_runs
(run_id, timestamp, source, metrics, config_snapshot, baseline_name)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
run["run_id"],
run["timestamp"],
run["source"],
json.dumps(run["metrics"]),
json.dumps(run["config_snapshot"]),
run.get("baseline_name"),
),
)
await db.commit()query_eval_runs method · python · L182-L213 (32 LOC)harness/traces/store.py
async def query_eval_runs(
self, source: str | None = None, limit: int = 20
) -> list[dict]:
"""Fetch eval run records ordered by timestamp DESC with optional source filter.
Args:
source: Optional source filter ("replay" or "lm-eval").
limit: Maximum number of records to return (default 20).
Returns:
List of eval run dicts with metrics and config_snapshot parsed from JSON.
"""
async with aiosqlite.connect(self._db_path) as db:
db.row_factory = aiosqlite.Row
if source is not None:
query = (
"SELECT * FROM eval_runs WHERE source = ? "
"ORDER BY timestamp DESC LIMIT ?"
)
params: tuple = (source, limit)
else:
query = "SELECT * FROM eval_runs ORDER BY timestamp DESC LIMIT ?"
params = (limit,)
async with db.execute(query, params) as ccreate_job method · python · L215-L230 (16 LOC)harness/traces/store.py
async def create_job(self, job: dict) -> None:
"""Insert a new red team job with status='pending'.
Args:
job: Dict with fields: job_id, type ('garak' or 'deepteam').
"""
created_at = datetime.now(timezone.utc).isoformat()
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"""
INSERT INTO redteam_jobs (job_id, type, status, created_at)
VALUES (?, ?, 'pending', ?)
""",
(job["job_id"], job["type"], created_at),
)
await db.commit()update_job_status method · python · L232-L259 (28 LOC)harness/traces/store.py
async def update_job_status(
self, job_id: str, status: str, result: dict | None = None
) -> None:
"""Update a red team job's status (and optionally result).
Sets completed_at for terminal statuses ('complete', 'failed').
Args:
job_id: Unique job identifier.
status: New status ('running', 'complete', 'failed').
result: Optional result dict (JSON-serialized).
"""
completed_at = (
datetime.now(timezone.utc).isoformat()
if status in ("complete", "failed")
else None
)
result_json = json.dumps(result) if result is not None else None
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"""
UPDATE redteam_jobs
SET status = ?, completed_at = ?, result = ?
WHERE job_id = ?
""",
(status, completed_at, result_json, job_id),get_job method · python · L261-L278 (18 LOC)harness/traces/store.py
async def get_job(self, job_id: str) -> dict | None:
"""Fetch a single red team job by job_id.
Returns:
dict of the row with result parsed from JSON, or None if not found.
"""
async with aiosqlite.connect(self._db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM redteam_jobs WHERE job_id = ?", (job_id,)
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
record = dict(row)
if record.get("result") is not None:
record["result"] = json.loads(record["result"])
return recordlist_jobs method · python · L280-L302 (23 LOC)harness/traces/store.py
async def list_jobs(self, limit: int = 20) -> list[dict]:
"""Fetch red team jobs ordered by created_at DESC.
Args:
limit: Maximum number of records to return (default 20).
Returns:
List of job dicts with result parsed from JSON.
"""
async with aiosqlite.connect(self._db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM redteam_jobs ORDER BY created_at DESC LIMIT ?",
(limit,),
) as cursor:
rows = await cursor.fetchall()
result = []
for row in rows:
record = dict(row)
if record.get("result") is not None:
record["result"] = json.loads(record["result"])
result.append(record)
return resultquery_near_misses method · python · L304-L340 (37 LOC)harness/traces/store.py
async def query_near_misses(self, since: str, limit: int = 100) -> list[dict]:
"""Fetch traces that scored above zero but were not blocked.
Queries traces since the given timestamp that were not refusals and
had guardrail decisions recorded, then filters in Python to keep only
those where at least one rail result has score > 0.
Args:
since: ISO8601 string — lower bound (inclusive).
limit: Maximum number of records to return (default 100).
Returns:
List of trace dicts matching near-miss criteria.
"""
async with aiosqlite.connect(self._db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""
SELECT * FROM traces
WHERE timestamp >= ?
AND guardrail_decisions IS NOT NULL
AND refusal_event = 0
ORDER BY timestamp DESC
LIMIT ?
Repobility · MCP-ready · https://repobility.com
write_correction method · python · L342-L377 (36 LOC)harness/traces/store.py
async def write_correction(self, correction: dict) -> None:
"""Insert a correction record into the corrections table.
PII in edited_response is redacted before storage.
Validates that action is one of 'approve', 'reject', 'edit' — SQLite
CHECK constraint enforces this; raises on invalid action.
Args:
correction: Dict with fields: request_id, reviewer, action,
edited_response (optional), trace_ref (optional).
"""
from harness.pii.redactor import redact as redact_text
edited_response = correction.get("edited_response")
if edited_response is not None:
edited_response = redact_text(edited_response)
created_at = correction.get("created_at") or datetime.now(timezone.utc).isoformat()
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"""
INSERT INTO corrections
(request_id,query_corrections method · python · L379-L398 (20 LOC)harness/traces/store.py
async def query_corrections(self, request_id: str | None = None) -> list[dict]:
"""Fetch correction records.
Args:
request_id: If provided, filter to corrections for this request_id.
Returns:
List of correction dicts ordered by created_at DESC.
"""
async with aiosqlite.connect(self._db_path) as db:
db.row_factory = aiosqlite.Row
if request_id is not None:
query = "SELECT * FROM corrections WHERE request_id = ? ORDER BY created_at DESC"
params: tuple = (request_id,)
else:
query = "SELECT * FROM corrections ORDER BY created_at DESC"
params = ()
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]query_hitl_queue method · python · L400-L488 (89 LOC)harness/traces/store.py
async def query_hitl_queue(
self,
since: str,
rail_filter: str = "all",
tenant_filter: str = "all",
hide_reviewed: bool = False,
limit: int = 200,
) -> list[dict]:
"""Fetch HITL review queue: flagged traces sorted by priority.
Priority ordering: unreviewed items before reviewed, within each group
sorted by priority DESC (closest to threshold first).
Args:
since: ISO8601 timestamp lower bound (inclusive).
rail_filter: Rail name to filter on, or 'all' for no filter.
tenant_filter: Tenant ID to filter on, or 'all' for no filter.
hide_reviewed: If True, exclude traces with existing corrections.
limit: Maximum traces to fetch from DB before Python post-processing.
Returns:
List of trace dicts augmented with: priority (float),
triggering_rail (str|None), correction_action (str|None),
correction_revie