← back to mjamiv__natural-language-builder

Function bodies 335 total

All specs Real LLM only Function bodies
state_abbreviation function · python · L227-L240 (14 LOC)
src/nlb/utils/geo.py
def state_abbreviation(state_name: str) -> str:
    """Convert a full US state name to its 2-letter USPS abbreviation.

    Case-insensitive.  Returns the input unchanged if not found (so callers
    that already have abbreviations pass through cleanly).

    Examples::

        >>> state_abbreviation("Illinois")
        'IL'
        >>> state_abbreviation("IL")   # already abbreviated
        'IL'
    """
    return _normalize_state(state_name)
is_valid_us_coordinate function · python · L243-L258 (16 LOC)
src/nlb/utils/geo.py
def is_valid_us_coordinate(lat: float, lon: float) -> bool:
    """Rough bounding-box check for the contiguous US + AK + HI.

    Not a rigorous polygon test — just a sanity check to catch obviously
    wrong inputs before hitting external APIs.
    """
    # Contiguous US
    if 24.0 <= lat <= 49.5 and -125.0 <= lon <= -66.0:
        return True
    # Alaska
    if 51.0 <= lat <= 72.0 and -180.0 <= lon <= -130.0:
        return True
    # Hawaii
    if 18.5 <= lat <= 22.5 and -160.5 <= lon <= -154.5:
        return True
    return False
haversine_ft function · python · L261-L276 (16 LOC)
src/nlb/utils/geo.py
def haversine_ft(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Great-circle distance between two WGS-84 points, in feet.

    Uses the haversine formula.  Accurate to ~0.5% for distances < 1000 mi.
    Primarily useful for sanity-checking coordinate pairs.
    """
    import math

    R_FT = 20_902_231.0   # mean Earth radius in feet

    φ1, φ2 = math.radians(lat1), math.radians(lat2)
    Δφ = math.radians(lat2 - lat1)
    Δλ = math.radians(lon2 - lon1)

    a = math.sin(Δφ / 2) ** 2 + math.cos(φ1) * math.cos(φ2) * math.sin(Δλ / 2) ** 2
    return 2 * R_FT * math.asin(math.sqrt(a))
_normalize_state function · python · L283-L301 (19 LOC)
src/nlb/utils/geo.py
def _normalize_state(raw: str) -> str:
    """Return USPS 2-letter abbreviation for any state name/abbr input.

    Handles:
    - "Illinois"          → "IL"
    - "illinois"          → "IL"
    - "IL"                → "IL"  (pass-through)
    - "North Dakota"      → "ND"
    Returns "" for unrecognized input.
    """
    if not raw:
        return ""
    stripped = raw.strip()
    # Already an abbreviation?
    upper = stripped.upper()
    if upper in STATE_NAMES:
        return upper
    # Try full name lookup (case-insensitive)
    return _STATE_NAME_TO_ABBR.get(stripped.lower(), "")
_new_run function · python · L84-L97 (14 LOC)
ui/run_manager.py
def _new_run(run_id: str, description: str) -> dict:
    return {
        "run_id": run_id,
        "description": description,
        "status": "pending",       # pending | running | completed | failed
        "step": "parse",
        "progress": 0,
        "started_at": time.time(),
        "finished_at": None,
        "logs": [],                # list of log event dicts
        "results": None,           # full results dict when complete
        "error": None,             # error message if failed
        "output_dir": f"/tmp/natural-language-builder/nlb-run-{run_id}",
    }
_get_run function · python · L100-L102 (3 LOC)
ui/run_manager.py
def _get_run(run_id: str) -> Optional[dict]:
    with _runs_lock:
        return _runs.get(run_id)
_update_run function · python · L105-L108 (4 LOC)
ui/run_manager.py
def _update_run(run_id: str, **kwargs):
    with _runs_lock:
        if run_id in _runs:
            _runs[run_id].update(kwargs)
Same scanner, your repo: https://repobility.com — Repobility
_get_or_create_event_loop function · python · L113-L118 (6 LOC)
ui/run_manager.py
def _get_or_create_event_loop():
    """Get the running loop, or create a new one for non-async contexts."""
    try:
        return asyncio.get_running_loop()
    except RuntimeError:
        return None
_broadcast_log function · python · L121-L132 (12 LOC)
ui/run_manager.py
def _broadcast_log(run_id: str, event: dict):
    """Push a log event to all SSE queues for this run (thread-safe)."""
    with _sse_lock:
        queues = _sse_queues.get(run_id, [])
        for q in queues:
            # schedule put in the event loop from the background thread
            try:
                loop = q._loop if hasattr(q, "_loop") else None
                if loop and loop.is_running():
                    loop.call_soon_threadsafe(q.put_nowait, event)
            except Exception:
                pass
register_sse_queue function · python · L135-L144 (10 LOC)
ui/run_manager.py
def register_sse_queue(run_id: str, q: asyncio.Queue):
    """Register a new SSE subscriber queue."""
    with _sse_lock:
        _sse_queues.setdefault(run_id, []).append(q)
    # Store the loop on the queue so background threads can schedule on it
    try:
        loop = asyncio.get_running_loop()
        q._loop = loop  # type: ignore[attr-defined]
    except RuntimeError:
        pass
unregister_sse_queue function · python · L147-L152 (6 LOC)
ui/run_manager.py
def unregister_sse_queue(run_id: str, q: asyncio.Queue):
    """Remove a SSE subscriber queue."""
    with _sse_lock:
        qs = _sse_queues.get(run_id, [])
        if q in qs:
            qs.remove(q)
_send_sentinel function · python · L155-L157 (3 LOC)
ui/run_manager.py
def _send_sentinel(run_id: str):
    """Send end-of-stream sentinel to all SSE queues for this run."""
    _broadcast_log(run_id, None)  # None = done sentinel
_make_log function · python · L162-L168 (7 LOC)
ui/run_manager.py
def _make_log(level: str, message: str, step: str) -> dict:
    return {
        "ts": time.strftime("%H:%M:%S"),
        "level": level,      # info | success | warn | error
        "message": message,
        "step": step,
    }
_append_log function · python · L171-L175 (5 LOC)
ui/run_manager.py
def _append_log(run_id: str, event: dict):
    with _runs_lock:
        if run_id in _runs:
            _runs[run_id]["logs"].append(event)
    _broadcast_log(run_id, event)
_LogCapture class · python · L180-L234 (55 LOC)
ui/run_manager.py
class _LogCapture:
    """Context manager that patches nlb.cli print helpers in the
    calling thread and routes output to the run's log store."""

    def __init__(self, run_id: str, step_tracker: list):
        self.run_id = run_id
        self.step_tracker = step_tracker  # mutable [current_step]
        self._originals = {}

    def _log(self, level: str, message: str):
        step = self.step_tracker[0]
        event = _make_log(level, message, step)
        _append_log(self.run_id, event)

    def _patched_status(self, icon: str, msg: str):
        # Detect step from icon
        new_step = ICON_TO_STEP.get(icon)
        if new_step:
            self.step_tracker[0] = new_step
            progress = STEP_PROGRESS.get(new_step, STEP_PROGRESS.get(self.step_tracker[0], 0))
            _update_run(self.run_id, step=new_step, progress=progress)
        self._log("info", f"{icon} {msg}")

    def _patched_success(self, msg: str):
        self._log("success", f"✓ {msg}")

    def _pat
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
__init__ method · python · L184-L187 (4 LOC)
ui/run_manager.py
    def __init__(self, run_id: str, step_tracker: list):
        self.run_id = run_id
        self.step_tracker = step_tracker  # mutable [current_step]
        self._originals = {}
_log method · python · L189-L192 (4 LOC)
ui/run_manager.py
    def _log(self, level: str, message: str):
        step = self.step_tracker[0]
        event = _make_log(level, message, step)
        _append_log(self.run_id, event)
_patched_status method · python · L194-L201 (8 LOC)
ui/run_manager.py
    def _patched_status(self, icon: str, msg: str):
        # Detect step from icon
        new_step = ICON_TO_STEP.get(icon)
        if new_step:
            self.step_tracker[0] = new_step
            progress = STEP_PROGRESS.get(new_step, STEP_PROGRESS.get(self.step_tracker[0], 0))
            _update_run(self.run_id, step=new_step, progress=progress)
        self._log("info", f"{icon} {msg}")
__enter__ method · python · L215-L229 (15 LOC)
ui/run_manager.py
    def __enter__(self):
        import nlb.cli as _cli
        self._originals = {
            "status":  _cli.status,
            "success": _cli.success,
            "warn":    _cli.warn,
            "error":   _cli.error,
            "header":  _cli.header,
        }
        _cli.status  = self._patched_status
        _cli.success = self._patched_success
        _cli.warn    = self._patched_warn
        _cli.error   = self._patched_error
        _cli.header  = self._patched_header
        return self
__exit__ method · python · L231-L234 (4 LOC)
ui/run_manager.py
    def __exit__(self, *_):
        import nlb.cli as _cli
        for name, fn in self._originals.items():
            setattr(_cli, name, fn)
_generate_moment_diagram function · python · L242-L419 (178 LOC)
ui/run_manager.py
def _generate_moment_diagram(params: dict, results: dict, output_dir: Path) -> bool:
    """Generate an interactive HTML moment diagram using the rich canvas template."""
    try:
        spans = params.get("spans", [100.0])
        n_spans = len(spans)
        total_len = sum(spans)

        # Build approximate moment envelope (simple beam parabola per span)
        # For a continuous beam, we use a rough approximation
        points_x = []
        points_moment = []
        n_pts = 20

        cumulative = 0.0
        for span_i, span_len in enumerate(spans):
            for j in range(n_pts + (1 if span_i == n_spans - 1 else 0)):
                x_frac = j / n_pts
                x = cumulative + x_frac * span_len
                # Simple beam moment: M = w*L/2 * x - w/2 * x^2 (parabola, normalized)
                # Rough w: use total uniform load ~4 kip/ft typical
                w = 4.0  # kip/ft
                xi = x_frac * span_len
                m = w * span_len / 2 * xi - w
_run_pipeline_thread function · python · L424-L508 (85 LOC)
ui/run_manager.py
def _run_pipeline_thread(run_id: str, description: str):
    """This function runs in a ThreadPoolExecutor thread."""
    run = _get_run(run_id)
    if run is None:
        return

    output_dir = Path(run["output_dir"])
    output_dir.mkdir(parents=True, exist_ok=True)

    step_tracker = ["parse"]  # mutable ref so patches can update it

    _update_run(run_id, status="running", step="parse", progress=0)

    try:
        import nlb.cli as _cli
        from nlb.cli import parse_description, run_pipeline

        # ── Step: parse ───────────────────────────────────────────────
        parse_event = _make_log("info", "📝 Parsing description...", "parse")
        _append_log(run_id, parse_event)

        with _CLI_PATCH_LOCK:
            with _LogCapture(run_id, step_tracker):
                params = parse_description(description)

        spans = params.get("spans", [])
        btype = (params.get("bridge_type") or "unknown").replace("_", " ")
        n_spans = len(spans)
        span
start_run function · python · L513-L525 (13 LOC)
ui/run_manager.py
def start_run(description: str) -> str:
    """Create a new run, start it in the background, return run_id."""
    run_id = str(uuid.uuid4())[:8]
    state = _new_run(run_id, description)

    with _runs_lock:
        _runs[run_id] = state

    with _sse_lock:
        _sse_queues[run_id] = []

    _executor.submit(_run_pipeline_thread, run_id, description)
    return run_id
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
get_status function · python · L528-L540 (13 LOC)
ui/run_manager.py
def get_status(run_id: str) -> Optional[dict]:
    run = _get_run(run_id)
    if run is None:
        return None
    elapsed = (run.get("finished_at") or time.time()) - run["started_at"]
    return {
        "run_id":   run_id,
        "status":   run["status"],
        "step":     run["step"],
        "progress": run["progress"],
        "elapsed":  round(elapsed, 1),
        "error":    run.get("error"),
    }
get_logs function · python · L543-L547 (5 LOC)
ui/run_manager.py
def get_logs(run_id: str) -> Optional[List[dict]]:
    run = _get_run(run_id)
    if run is None:
        return None
    return list(run["logs"])
get_results function · python · L550-L554 (5 LOC)
ui/run_manager.py
def get_results(run_id: str) -> Optional[dict]:
    run = _get_run(run_id)
    if run is None:
        return None
    return run.get("results")
get_artifacts function · python · L557-L583 (27 LOC)
ui/run_manager.py
def get_artifacts(run_id: str) -> Optional[List[dict]]:
    run = _get_run(run_id)
    if run is None:
        return None

    output_dir = Path(run["output_dir"])
    if not output_dir.exists():
        return []

    artifacts = []
    for path in sorted(output_dir.iterdir()):
        if path.is_file():
            ext = path.suffix.lower()
            ftype = {
                ".html": "html",
                ".md":   "markdown",
                ".json": "json",
                ".pdf":  "pdf",
                ".txt":  "text",
            }.get(ext, "file")
            artifacts.append({
                "name":  path.name,
                "type":  ftype,
                "size":  path.stat().st_size,
                "url":   f"/api/run/{run_id}/artifact/{path.name}",
            })
    return artifacts
get_artifact_path function · python · L586-L600 (15 LOC)
ui/run_manager.py
def get_artifact_path(run_id: str, filename: str) -> Optional[Path]:
    run = _get_run(run_id)
    if run is None:
        return None

    base = Path(run["output_dir"]).resolve()
    candidate = (base / filename).resolve()

    # Prevent path traversal outside the run output directory.
    try:
        candidate.relative_to(base)
    except ValueError:
        return None

    return candidate if candidate.exists() and candidate.is_file() else None
post_run function · python · L114-L118 (5 LOC)
ui/server.py
def post_run(body: RunRequest):
    if not body.description.strip():
        raise HTTPException(status_code=400, detail="description is required")
    run_id = rm.start_run(body.description.strip())
    return {"run_id": run_id}
get_run_status function · python · L122-L126 (5 LOC)
ui/server.py
def get_run_status(run_id: str):
    status = rm.get_status(run_id)
    if status is None:
        raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
    return status
get_run_results function · python · L130-L139 (10 LOC)
ui/server.py
def get_run_results(run_id: str):
    status = rm.get_status(run_id)
    if status is None:
        raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
    if status["status"] == "running" or status["status"] == "pending":
        raise HTTPException(status_code=202, detail="Run still in progress")
    results = rm.get_results(run_id)
    if results is None:
        raise HTTPException(status_code=404, detail="No results available (run may have failed)")
    return results
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
get_run_artifacts function · python · L143-L147 (5 LOC)
ui/server.py
def get_run_artifacts(run_id: str):
    artifacts = rm.get_artifacts(run_id)
    if artifacts is None:
        raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
    return {"artifacts": artifacts}
get_artifact_file function · python · L151-L165 (15 LOC)
ui/server.py
def get_artifact_file(run_id: str, filename: str):
    path = rm.get_artifact_path(run_id, filename)
    if path is None:
        raise HTTPException(status_code=404, detail=f"Artifact '{filename}' not found")

    # Guess MIME type
    mime, _ = mimetypes.guess_type(filename)
    if mime is None:
        mime = "application/octet-stream"

    # For markdown, serve as text so browser can render it
    if filename.endswith(".md"):
        mime = "text/plain; charset=utf-8"

    return FileResponse(str(path), media_type=mime, filename=filename)
get_run_logs_sse function · python · L169-L236 (68 LOC)
ui/server.py
async def get_run_logs_sse(run_id: str, request: Request):
    """Server-Sent Events stream of log events for a run.

    Sends already-buffered logs immediately, then streams new events
    as they arrive, and closes when the run completes (sentinel = None).
    """
    status = rm.get_status(run_id)
    if status is None:
        raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")

    async def event_generator():
        # 1. Drain existing logs first (for late-joining clients)
        existing = rm.get_logs(run_id) or []
        for log_event in existing:
            yield {"data": json.dumps(log_event)}
            await asyncio.sleep(0)

        # If run already finished, we're done
        current_status = rm.get_status(run_id)
        if current_status and current_status["status"] in ("completed", "failed"):
            yield {"data": json.dumps({"level": "info", "message": "__END__", "step": "done", "ts": ""})}
            return

        # 2. Register a q
serve_index function · python · L243-L250 (8 LOC)
ui/server.py
async def serve_index():
    index = _STATIC_DIR / "index.html"
    if not index.exists():
        return JSONResponse(
            status_code=200,
            content={"message": "NLB backend running. Place index.html in ui/static/"},
        )
    return FileResponse(str(index))
‹ prevpage 7 / 7