Function bodies 335 total
state_abbreviation function · python · L227-L240 (14 LOC)src/nlb/utils/geo.py
def state_abbreviation(state_name: str) -> str:
"""Convert a full US state name to its 2-letter USPS abbreviation.
Case-insensitive. Returns the input unchanged if not found (so callers
that already have abbreviations pass through cleanly).
Examples::
>>> state_abbreviation("Illinois")
'IL'
>>> state_abbreviation("IL") # already abbreviated
'IL'
"""
return _normalize_state(state_name)is_valid_us_coordinate function · python · L243-L258 (16 LOC)src/nlb/utils/geo.py
def is_valid_us_coordinate(lat: float, lon: float) -> bool:
"""Rough bounding-box check for the contiguous US + AK + HI.
Not a rigorous polygon test — just a sanity check to catch obviously
wrong inputs before hitting external APIs.
"""
# Contiguous US
if 24.0 <= lat <= 49.5 and -125.0 <= lon <= -66.0:
return True
# Alaska
if 51.0 <= lat <= 72.0 and -180.0 <= lon <= -130.0:
return True
# Hawaii
if 18.5 <= lat <= 22.5 and -160.5 <= lon <= -154.5:
return True
return Falsehaversine_ft function · python · L261-L276 (16 LOC)src/nlb/utils/geo.py
def haversine_ft(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance between two WGS-84 points, in feet.
Uses the haversine formula. Accurate to ~0.5% for distances < 1000 mi.
Primarily useful for sanity-checking coordinate pairs.
"""
import math
R_FT = 20_902_231.0 # mean Earth radius in feet
φ1, φ2 = math.radians(lat1), math.radians(lat2)
Δφ = math.radians(lat2 - lat1)
Δλ = math.radians(lon2 - lon1)
a = math.sin(Δφ / 2) ** 2 + math.cos(φ1) * math.cos(φ2) * math.sin(Δλ / 2) ** 2
return 2 * R_FT * math.asin(math.sqrt(a))_normalize_state function · python · L283-L301 (19 LOC)src/nlb/utils/geo.py
def _normalize_state(raw: str) -> str:
"""Return USPS 2-letter abbreviation for any state name/abbr input.
Handles:
- "Illinois" → "IL"
- "illinois" → "IL"
- "IL" → "IL" (pass-through)
- "North Dakota" → "ND"
Returns "" for unrecognized input.
"""
if not raw:
return ""
stripped = raw.strip()
# Already an abbreviation?
upper = stripped.upper()
if upper in STATE_NAMES:
return upper
# Try full name lookup (case-insensitive)
return _STATE_NAME_TO_ABBR.get(stripped.lower(), "")_new_run function · python · L84-L97 (14 LOC)ui/run_manager.py
def _new_run(run_id: str, description: str) -> dict:
return {
"run_id": run_id,
"description": description,
"status": "pending", # pending | running | completed | failed
"step": "parse",
"progress": 0,
"started_at": time.time(),
"finished_at": None,
"logs": [], # list of log event dicts
"results": None, # full results dict when complete
"error": None, # error message if failed
"output_dir": f"/tmp/natural-language-builder/nlb-run-{run_id}",
}_get_run function · python · L100-L102 (3 LOC)ui/run_manager.py
def _get_run(run_id: str) -> Optional[dict]:
with _runs_lock:
return _runs.get(run_id)_update_run function · python · L105-L108 (4 LOC)ui/run_manager.py
def _update_run(run_id: str, **kwargs):
with _runs_lock:
if run_id in _runs:
_runs[run_id].update(kwargs)Same scanner, your repo: https://repobility.com — Repobility
_get_or_create_event_loop function · python · L113-L118 (6 LOC)ui/run_manager.py
def _get_or_create_event_loop():
"""Get the running loop, or create a new one for non-async contexts."""
try:
return asyncio.get_running_loop()
except RuntimeError:
return None_broadcast_log function · python · L121-L132 (12 LOC)ui/run_manager.py
def _broadcast_log(run_id: str, event: dict):
"""Push a log event to all SSE queues for this run (thread-safe)."""
with _sse_lock:
queues = _sse_queues.get(run_id, [])
for q in queues:
# schedule put in the event loop from the background thread
try:
loop = q._loop if hasattr(q, "_loop") else None
if loop and loop.is_running():
loop.call_soon_threadsafe(q.put_nowait, event)
except Exception:
passregister_sse_queue function · python · L135-L144 (10 LOC)ui/run_manager.py
def register_sse_queue(run_id: str, q: asyncio.Queue):
"""Register a new SSE subscriber queue."""
with _sse_lock:
_sse_queues.setdefault(run_id, []).append(q)
# Store the loop on the queue so background threads can schedule on it
try:
loop = asyncio.get_running_loop()
q._loop = loop # type: ignore[attr-defined]
except RuntimeError:
passunregister_sse_queue function · python · L147-L152 (6 LOC)ui/run_manager.py
def unregister_sse_queue(run_id: str, q: asyncio.Queue):
"""Remove a SSE subscriber queue."""
with _sse_lock:
qs = _sse_queues.get(run_id, [])
if q in qs:
qs.remove(q)_send_sentinel function · python · L155-L157 (3 LOC)ui/run_manager.py
def _send_sentinel(run_id: str):
"""Send end-of-stream sentinel to all SSE queues for this run."""
_broadcast_log(run_id, None) # None = done sentinel_make_log function · python · L162-L168 (7 LOC)ui/run_manager.py
def _make_log(level: str, message: str, step: str) -> dict:
return {
"ts": time.strftime("%H:%M:%S"),
"level": level, # info | success | warn | error
"message": message,
"step": step,
}_append_log function · python · L171-L175 (5 LOC)ui/run_manager.py
def _append_log(run_id: str, event: dict):
with _runs_lock:
if run_id in _runs:
_runs[run_id]["logs"].append(event)
_broadcast_log(run_id, event)_LogCapture class · python · L180-L234 (55 LOC)ui/run_manager.py
class _LogCapture:
"""Context manager that patches nlb.cli print helpers in the
calling thread and routes output to the run's log store."""
def __init__(self, run_id: str, step_tracker: list):
self.run_id = run_id
self.step_tracker = step_tracker # mutable [current_step]
self._originals = {}
def _log(self, level: str, message: str):
step = self.step_tracker[0]
event = _make_log(level, message, step)
_append_log(self.run_id, event)
def _patched_status(self, icon: str, msg: str):
# Detect step from icon
new_step = ICON_TO_STEP.get(icon)
if new_step:
self.step_tracker[0] = new_step
progress = STEP_PROGRESS.get(new_step, STEP_PROGRESS.get(self.step_tracker[0], 0))
_update_run(self.run_id, step=new_step, progress=progress)
self._log("info", f"{icon} {msg}")
def _patched_success(self, msg: str):
self._log("success", f"✓ {msg}")
def _patProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
__init__ method · python · L184-L187 (4 LOC)ui/run_manager.py
def __init__(self, run_id: str, step_tracker: list):
self.run_id = run_id
self.step_tracker = step_tracker # mutable [current_step]
self._originals = {}_log method · python · L189-L192 (4 LOC)ui/run_manager.py
def _log(self, level: str, message: str):
step = self.step_tracker[0]
event = _make_log(level, message, step)
_append_log(self.run_id, event)_patched_status method · python · L194-L201 (8 LOC)ui/run_manager.py
def _patched_status(self, icon: str, msg: str):
# Detect step from icon
new_step = ICON_TO_STEP.get(icon)
if new_step:
self.step_tracker[0] = new_step
progress = STEP_PROGRESS.get(new_step, STEP_PROGRESS.get(self.step_tracker[0], 0))
_update_run(self.run_id, step=new_step, progress=progress)
self._log("info", f"{icon} {msg}")__enter__ method · python · L215-L229 (15 LOC)ui/run_manager.py
def __enter__(self):
import nlb.cli as _cli
self._originals = {
"status": _cli.status,
"success": _cli.success,
"warn": _cli.warn,
"error": _cli.error,
"header": _cli.header,
}
_cli.status = self._patched_status
_cli.success = self._patched_success
_cli.warn = self._patched_warn
_cli.error = self._patched_error
_cli.header = self._patched_header
return self__exit__ method · python · L231-L234 (4 LOC)ui/run_manager.py
def __exit__(self, *_):
import nlb.cli as _cli
for name, fn in self._originals.items():
setattr(_cli, name, fn)_generate_moment_diagram function · python · L242-L419 (178 LOC)ui/run_manager.py
def _generate_moment_diagram(params: dict, results: dict, output_dir: Path) -> bool:
"""Generate an interactive HTML moment diagram using the rich canvas template."""
try:
spans = params.get("spans", [100.0])
n_spans = len(spans)
total_len = sum(spans)
# Build approximate moment envelope (simple beam parabola per span)
# For a continuous beam, we use a rough approximation
points_x = []
points_moment = []
n_pts = 20
cumulative = 0.0
for span_i, span_len in enumerate(spans):
for j in range(n_pts + (1 if span_i == n_spans - 1 else 0)):
x_frac = j / n_pts
x = cumulative + x_frac * span_len
# Simple beam moment: M = w*L/2 * x - w/2 * x^2 (parabola, normalized)
# Rough w: use total uniform load ~4 kip/ft typical
w = 4.0 # kip/ft
xi = x_frac * span_len
m = w * span_len / 2 * xi - w_run_pipeline_thread function · python · L424-L508 (85 LOC)ui/run_manager.py
def _run_pipeline_thread(run_id: str, description: str):
"""This function runs in a ThreadPoolExecutor thread."""
run = _get_run(run_id)
if run is None:
return
output_dir = Path(run["output_dir"])
output_dir.mkdir(parents=True, exist_ok=True)
step_tracker = ["parse"] # mutable ref so patches can update it
_update_run(run_id, status="running", step="parse", progress=0)
try:
import nlb.cli as _cli
from nlb.cli import parse_description, run_pipeline
# ── Step: parse ───────────────────────────────────────────────
parse_event = _make_log("info", "📝 Parsing description...", "parse")
_append_log(run_id, parse_event)
with _CLI_PATCH_LOCK:
with _LogCapture(run_id, step_tracker):
params = parse_description(description)
spans = params.get("spans", [])
btype = (params.get("bridge_type") or "unknown").replace("_", " ")
n_spans = len(spans)
spanstart_run function · python · L513-L525 (13 LOC)ui/run_manager.py
def start_run(description: str) -> str:
"""Create a new run, start it in the background, return run_id."""
run_id = str(uuid.uuid4())[:8]
state = _new_run(run_id, description)
with _runs_lock:
_runs[run_id] = state
with _sse_lock:
_sse_queues[run_id] = []
_executor.submit(_run_pipeline_thread, run_id, description)
return run_idWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
get_status function · python · L528-L540 (13 LOC)ui/run_manager.py
def get_status(run_id: str) -> Optional[dict]:
run = _get_run(run_id)
if run is None:
return None
elapsed = (run.get("finished_at") or time.time()) - run["started_at"]
return {
"run_id": run_id,
"status": run["status"],
"step": run["step"],
"progress": run["progress"],
"elapsed": round(elapsed, 1),
"error": run.get("error"),
}get_logs function · python · L543-L547 (5 LOC)ui/run_manager.py
def get_logs(run_id: str) -> Optional[List[dict]]:
run = _get_run(run_id)
if run is None:
return None
return list(run["logs"])get_results function · python · L550-L554 (5 LOC)ui/run_manager.py
def get_results(run_id: str) -> Optional[dict]:
run = _get_run(run_id)
if run is None:
return None
return run.get("results")get_artifacts function · python · L557-L583 (27 LOC)ui/run_manager.py
def get_artifacts(run_id: str) -> Optional[List[dict]]:
run = _get_run(run_id)
if run is None:
return None
output_dir = Path(run["output_dir"])
if not output_dir.exists():
return []
artifacts = []
for path in sorted(output_dir.iterdir()):
if path.is_file():
ext = path.suffix.lower()
ftype = {
".html": "html",
".md": "markdown",
".json": "json",
".pdf": "pdf",
".txt": "text",
}.get(ext, "file")
artifacts.append({
"name": path.name,
"type": ftype,
"size": path.stat().st_size,
"url": f"/api/run/{run_id}/artifact/{path.name}",
})
return artifactsget_artifact_path function · python · L586-L600 (15 LOC)ui/run_manager.py
def get_artifact_path(run_id: str, filename: str) -> Optional[Path]:
run = _get_run(run_id)
if run is None:
return None
base = Path(run["output_dir"]).resolve()
candidate = (base / filename).resolve()
# Prevent path traversal outside the run output directory.
try:
candidate.relative_to(base)
except ValueError:
return None
return candidate if candidate.exists() and candidate.is_file() else Nonepost_run function · python · L114-L118 (5 LOC)ui/server.py
def post_run(body: RunRequest):
if not body.description.strip():
raise HTTPException(status_code=400, detail="description is required")
run_id = rm.start_run(body.description.strip())
return {"run_id": run_id}get_run_status function · python · L122-L126 (5 LOC)ui/server.py
def get_run_status(run_id: str):
status = rm.get_status(run_id)
if status is None:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
return statusget_run_results function · python · L130-L139 (10 LOC)ui/server.py
def get_run_results(run_id: str):
status = rm.get_status(run_id)
if status is None:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
if status["status"] == "running" or status["status"] == "pending":
raise HTTPException(status_code=202, detail="Run still in progress")
results = rm.get_results(run_id)
if results is None:
raise HTTPException(status_code=404, detail="No results available (run may have failed)")
return resultsMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
get_run_artifacts function · python · L143-L147 (5 LOC)ui/server.py
def get_run_artifacts(run_id: str):
artifacts = rm.get_artifacts(run_id)
if artifacts is None:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
return {"artifacts": artifacts}get_artifact_file function · python · L151-L165 (15 LOC)ui/server.py
def get_artifact_file(run_id: str, filename: str):
path = rm.get_artifact_path(run_id, filename)
if path is None:
raise HTTPException(status_code=404, detail=f"Artifact '{filename}' not found")
# Guess MIME type
mime, _ = mimetypes.guess_type(filename)
if mime is None:
mime = "application/octet-stream"
# For markdown, serve as text so browser can render it
if filename.endswith(".md"):
mime = "text/plain; charset=utf-8"
return FileResponse(str(path), media_type=mime, filename=filename)get_run_logs_sse function · python · L169-L236 (68 LOC)ui/server.py
async def get_run_logs_sse(run_id: str, request: Request):
"""Server-Sent Events stream of log events for a run.
Sends already-buffered logs immediately, then streams new events
as they arrive, and closes when the run completes (sentinel = None).
"""
status = rm.get_status(run_id)
if status is None:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
async def event_generator():
# 1. Drain existing logs first (for late-joining clients)
existing = rm.get_logs(run_id) or []
for log_event in existing:
yield {"data": json.dumps(log_event)}
await asyncio.sleep(0)
# If run already finished, we're done
current_status = rm.get_status(run_id)
if current_status and current_status["status"] in ("completed", "failed"):
yield {"data": json.dumps({"level": "info", "message": "__END__", "step": "done", "ts": ""})}
return
# 2. Register a qserve_index function · python · L243-L250 (8 LOC)ui/server.py
async def serve_index():
index = _STATIC_DIR / "index.html"
if not index.exists():
return JSONResponse(
status_code=200,
content={"message": "NLB backend running. Place index.html in ui/static/"},
)
return FileResponse(str(index))‹ prevpage 7 / 7